Etl task
ETL Task
Extract -> Transform -> Load
koheesio.spark.etl_task.EtlTask #
ETL Task
Etl stands for: Extract -> Transform -> Load
This task is a composition of a Reader (extract), a series of Transformations (transform) and a Writer (load). In other words, it reads data from a source, applies a series of transformations, and writes the result to a target.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the task |
required |
description
|
str
|
Description of the task |
required |
source
|
Reader
|
Source to read from [extract] |
required |
transformations
|
list[Transformation]
|
Series of transformations [transform]. The order of the transformations is important! |
required |
target
|
Writer
|
Target to write to [load] |
required |
Example
from koheesio.tasks import EtlTask
from koheesio.steps.readers import CsvReader
from koheesio.steps.transformations.repartition import Repartition
from koheesio.steps.writers import CsvWriter
etl_task = EtlTask(
name="My ETL Task",
description="This is an example ETL task",
source=CsvReader(path="path/to/source.csv"),
transformations=[Repartition(num_partitions=2)],
target=DummyWriter(),
)
etl_task.execute()
This code will read from a CSV file, repartition the DataFrame to 2 partitions, and write the result to the console.
Extending the EtlTask
The EtlTask is designed to be a simple and flexible way to define ETL processes. It is not designed to be a
one-size-fits-all solution, but rather a starting point for building more complex ETL processes. If you need more
complex functionality, you can extend the EtlTask class and override the extract
, transform
and load
methods.
You can also implement your own execute
method to define the entire ETL process from scratch should you need more
flexibility.
Advantages of using the EtlTask
- It is a simple way to define ETL processes
- It is easy to understand and extend
- It is easy to test and debug
- It is easy to maintain and refactor
- It is easy to integrate with other tools and libraries
- It is easy to use in a production environment
etl_date
class-attribute
instance-attribute
#
etl_date: datetime = Field(
default_factory=utc_now,
description="Date time when this object was created as iso format. Example: '2023-01-24T09:39:23.632374'",
)
source
class-attribute
instance-attribute
#
source: InstanceOf[Reader] = Field(
default=..., description="Source to read from [extract]"
)
target
class-attribute
instance-attribute
#
target: InstanceOf[Writer] = Field(
default=..., description="Target to write to [load]"
)
transformations
class-attribute
instance-attribute
#
transformations: conlist(
min_length=0, item_type=InstanceOf[Transformation]
) = Field(
default_factory=list,
description="Series of transformations",
alias="transforms",
)
Output #
Output class for EtlTask
source_df
class-attribute
instance-attribute
#
source_df: DataFrame = Field(
default=...,
description="The Spark DataFrame produced by .extract() method",
)
target_df
class-attribute
instance-attribute
#
target_df: DataFrame = Field(
default=...,
description="The Spark DataFrame used by .load() method",
)
transform_df
class-attribute
instance-attribute
#
transform_df: DataFrame = Field(
default=...,
description="The Spark DataFrame produced by .transform() method",
)
execute #
execute() -> Output
Run the ETL process
Source code in src/koheesio/spark/etl_task.py
extract #
Read from Source
logging is handled by the Reader.execute()-method's @do_execute decorator
load #
Write to Target
logging is handled by the Writer.execute()-method's @do_execute decorator
transform #
Transform recursively
logging is handled by the Transformation.execute()-method's @do_execute decorator