Etl task
ETL Task
Extract -> Transform -> Load
koheesio.spark.etl_task.EtlTask #
ETL Task
Etl stands for: Extract -> Transform -> Load
This task is a composition of a Reader (extract), a series of Transformations (transform) and a Writer (load). In other words, it reads data from a source, applies a series of transformations, and writes the result to a target.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str
|
Name of the task |
required |
description |
str
|
Description of the task |
required |
source |
Reader
|
Source to read from [extract] |
required |
transformations |
list[Transformation]
|
Series of transformations [transform]. The order of the transformations is important! |
required |
target |
Writer
|
Target to write to [load] |
required |
Example
from koheesio.tasks import EtlTask
from koheesio.steps.readers import CsvReader
from koheesio.steps.transformations.repartition import Repartition
from koheesio.steps.writers import CsvWriter
etl_task = EtlTask(
name="My ETL Task",
description="This is an example ETL task",
source=CsvReader(path="path/to/source.csv"),
transformations=[Repartition(num_partitions=2)],
target=DummyWriter(),
)
etl_task.execute()
This code will read from a CSV file, repartition the DataFrame to 2 partitions, and write the result to the console.
Extending the EtlTask
The EtlTask is designed to be a simple and flexible way to define ETL processes. It is not designed to be a
one-size-fits-all solution, but rather a starting point for building more complex ETL processes. If you need more
complex functionality, you can extend the EtlTask class and override the extract, transform and load methods.
You can also implement your own execute method to define the entire ETL process from scratch should you need more
flexibility.
Advantages of using the EtlTask
- It is a simple way to define ETL processes
- It is easy to understand and extend
- It is easy to test and debug
- It is easy to maintain and refactor
- It is easy to integrate with other tools and libraries
- It is easy to use in a production environment
etl_date
class-attribute
instance-attribute
#
etl_date: datetime = Field(
default=utcnow(),
description="Date time when this object was created as iso format. Example: '2023-01-24T09:39:23.632374'",
)
source
class-attribute
instance-attribute
#
source: InstanceOf[Reader] = Field(
default=..., description="Source to read from [extract]"
)
target
class-attribute
instance-attribute
#
target: InstanceOf[Writer] = Field(
default=..., description="Target to write to [load]"
)
transformations
class-attribute
instance-attribute
#
transformations: conlist(
min_length=0, item_type=InstanceOf[Transformation]
) = Field(
default_factory=list,
description="Series of transformations",
alias="transforms",
)
Output #
Output class for EtlTask
source_df
class-attribute
instance-attribute
#
source_df: DataFrame = Field(
default=...,
description="The Spark DataFrame produced by .extract() method",
)
target_df
class-attribute
instance-attribute
#
target_df: DataFrame = Field(
default=...,
description="The Spark DataFrame used by .load() method",
)
transform_df
class-attribute
instance-attribute
#
transform_df: DataFrame = Field(
default=...,
description="The Spark DataFrame produced by .transform() method",
)
execute #
Run the ETL process
Source code in src/koheesio/spark/etl_task.py
extract #
Read from Source
logging is handled by the Reader.execute()-method's @do_execute decorator
load #
Write to Target
logging is handled by the Writer.execute()-method's @do_execute decorator
run #
transform #
Transform recursively
logging is handled by the Transformation.execute()-method's @do_execute decorator