Simple Examples#
Creating a Custom Step#
This example demonstrates how to use the SparkStep class from the koheesio library to create a custom step named
HelloWorldStep.
Code#
from koheesio.steps.step import SparkStep
class HelloWorldStep(SparkStep):
message: str
def execute(self) -> SparkStep.Output:
# create a DataFrame with a single row containing the message
self.output.df = self.spark.createDataFrame([(1, self.message)], ["id", "message"])
Usage#
hello_world_step = HelloWorldStep(message="Hello, World!")
hello_world_step.execute()
hello_world_step.output.df.show()
Understanding the Code#
The HelloWorldStep class is a SparkStep in Koheesio, designed to generate a DataFrame with a single row containing a custom message. Here's a more detailed overview:
HelloWorldStepinherits fromSparkStep, a fundamental building block in Koheesio for creating data processing steps with Apache Spark.- It has a
messageattribute. When creating an instance ofHelloWorldStep, you can pass a custom message that will be used in the DataFrame. SparkStephas asparkattribute, which is the active SparkSession. This is the entry point for any Spark functionality, allowing the step to interact with the Spark cluster.SparkStepalso includes anOutputclass, used to store the output of the step. In this case,Outputhas adfattribute to store the output DataFrame.- The
executemethod creates a DataFrame with the custom message and stores it inoutput.df. It doesn't return a value explicitly; instead, the output DataFrame can be accessed viaoutput.df. - Koheesio uses pydantic for automatic validation of the step's input and output, ensuring they are correctly defined and of the correct types.
Note: Pydantic is a data validation library that provides a way to validate that the data (in this case, the input and output of the step) conforms to the expected format.
Creating a Custom Task#
This example demonstrates how to use the EtlTask from the koheesio library to create a custom task named MyFavoriteMovieTask.
Code#
from typing import Any
from pyspark.sql import DataFrame, functions as f
from koheesio.steps.transformations import Transform
from koheesio.tasks.etl_task import EtlTask
def add_column(df: DataFrame, target_column: str, value: Any):
return df.withColumn(target_column, f.lit(value))
class MyFavoriteMovieTask(EtlTask):
my_favorite_movie: str
def transform(self, df: Optional[DataFrame] = None) -> DataFrame:
df = df or self.extract()
# pre-transformations specific to this class
pre_transformations = [
Transform(add_column, target_column="myFavoriteMovie", value=self.my_favorite_movie)
]
# execute transformations one by one
for t in pre_transformations:
df = t.transform(df)
self.output.transform_df = df
return df
Configuration#
Here is the sample.yaml configuration file used in this example:
raw_layer:
catalog: development
schema: my_favorite_team
table: some_random_table
movies:
favorite: Office Space
hash_settings:
source_columns:
- id
- foo
target_column: hash_uuid5
source:
range: 4
Usage#
from pyspark.sql import SparkSession
from koheesio.context import Context
from koheesio.steps.readers import DummyReader
from koheesio.steps.writers.dummy import DummyWriter
context = Context.from_yaml("sample.yaml")
SparkSession.builder.getOrCreate()
my_fav_mov_task = MyFavoriteMovieTask(
source=DummyReader(**context.raw_layer),
target=DummyWriter(truncate=False),
my_favorite_movie=context.movies.favorite,
)
my_fav_mov_task.execute()
Understanding the Code#
This example creates a MyFavoriteMovieTask that adds a column named myFavoriteMovie to the DataFrame. The value for this column is provided when the task is instantiated.
The MyFavoriteMovieTask class is a custom task that extends the EtlTask from the koheesio library. It demonstrates how to add a custom transformation to a DataFrame. Here's a detailed breakdown:
-
MyFavoriteMovieTaskinherits fromEtlTask, a base class in Koheesio for creating Extract-Transform-Load (ETL) tasks with Apache Spark. -
It has a
my_favorite_movieattribute. When creating an instance ofMyFavoriteMovieTask, you can pass a custom movie title that will be used in the DataFrame. -
The
transformmethod is where the main logic of the task is implemented. It first extracts the data (if not already provided), then applies a series of transformations to the DataFrame. -
In this case, the transformation is adding a new column to the DataFrame named
myFavoriteMovie, with the value set to themy_favorite_movieattribute. This is done using theadd_columnfunction and theTransformclass from Koheesio. -
The transformed DataFrame is then stored in
self.output.transform_df. -
The
sample.yamlconfiguration file is used to provide the context for the task, including the source data and the favorite movie title. -
In the usage example, an instance of
MyFavoriteMovieTaskis created with aDummyReaderas the source, aDummyWriteras the target, and the favorite movie title from the context. The task is then executed, which runs the transformations and stores the result inself.output.transform_df.