Skip to content

Etl task

ETL Task

Extract -> Transform -> Load

koheesio.spark.etl_task.EtlTask #

ETL Task

Etl stands for: Extract -> Transform -> Load

This task is a composition of a Reader (extract), a series of Transformations (transform) and a Writer (load). In other words, it reads data from a source, applies a series of transformations, and writes the result to a target.

Parameters:

Name Type Description Default
name str

Name of the task

required
description str

Description of the task

required
source Reader

Source to read from [extract]

required
transformations list[Transformation]

Series of transformations [transform]. The order of the transformations is important!

required
target Writer

Target to write to [load]

required
Example
from koheesio.tasks import EtlTask

from koheesio.steps.readers import CsvReader
from koheesio.steps.transformations.repartition import Repartition
from koheesio.steps.writers import CsvWriter

etl_task = EtlTask(
    name="My ETL Task",
    description="This is an example ETL task",
    source=CsvReader(path="path/to/source.csv"),
    transformations=[Repartition(num_partitions=2)],
    target=DummyWriter(),
)

etl_task.execute()

This code will read from a CSV file, repartition the DataFrame to 2 partitions, and write the result to the console.

Extending the EtlTask

The EtlTask is designed to be a simple and flexible way to define ETL processes. It is not designed to be a one-size-fits-all solution, but rather a starting point for building more complex ETL processes. If you need more complex functionality, you can extend the EtlTask class and override the extract, transform and load methods. You can also implement your own execute method to define the entire ETL process from scratch should you need more flexibility.

Advantages of using the EtlTask
  • It is a simple way to define ETL processes
  • It is easy to understand and extend
  • It is easy to test and debug
  • It is easy to maintain and refactor
  • It is easy to integrate with other tools and libraries
  • It is easy to use in a production environment

etl_date class-attribute instance-attribute #

etl_date: datetime = Field(
    default_factory=utc_now,
    description="Date time when this object was created as iso format. Example: '2023-01-24T09:39:23.632374'",
)

source class-attribute instance-attribute #

source: InstanceOf[Reader] = Field(
    default=..., description="Source to read from [extract]"
)

target class-attribute instance-attribute #

target: InstanceOf[Writer] = Field(
    default=..., description="Target to write to [load]"
)

transformations class-attribute instance-attribute #

transformations: conlist(
    min_length=0, item_type=InstanceOf[Transformation]
) = Field(
    default_factory=list,
    description="Series of transformations",
    alias="transforms",
)

Output #

Output class for EtlTask

source_df class-attribute instance-attribute #

source_df: DataFrame = Field(
    default=...,
    description="The Spark DataFrame produced by .extract() method",
)

target_df class-attribute instance-attribute #

target_df: DataFrame = Field(
    default=...,
    description="The Spark DataFrame used by .load() method",
)

transform_df class-attribute instance-attribute #

transform_df: DataFrame = Field(
    default=...,
    description="The Spark DataFrame produced by .transform() method",
)

execute #

execute() -> Output

Run the ETL process

Source code in src/koheesio/spark/etl_task.py
def execute(self) -> Step.Output:
    """Run the ETL process"""
    self.log.info(f"Task started at {self.etl_date}")

    # extract from source
    self.output.source_df = self.extract()

    # transform
    self.output.transform_df = self.transform(self.output.source_df)

    # load to target
    self.output.target_df = self.load(self.output.transform_df)

extract #

extract() -> DataFrame

Read from Source

logging is handled by the Reader.execute()-method's @do_execute decorator

Source code in src/koheesio/spark/etl_task.py
def extract(self) -> DataFrame:
    """Read from Source

    logging is handled by the Reader.execute()-method's @do_execute decorator
    """
    reader: Reader = self.source
    return reader.read()

load #

load(df: DataFrame) -> DataFrame

Write to Target

logging is handled by the Writer.execute()-method's @do_execute decorator

Source code in src/koheesio/spark/etl_task.py
def load(self, df: DataFrame) -> DataFrame:
    """Write to Target

    logging is handled by the Writer.execute()-method's @do_execute decorator
    """
    writer: Writer = self.target
    writer.write(df)
    return df

transform #

transform(df: DataFrame) -> DataFrame

Transform recursively

logging is handled by the Transformation.execute()-method's @do_execute decorator

Source code in src/koheesio/spark/etl_task.py
def transform(self, df: DataFrame) -> DataFrame:
    """Transform recursively

    logging is handled by the Transformation.execute()-method's @do_execute decorator
    """
    for t in self.transformations:
        df = t.transform(df)
    return df