Simple Examples#
Bring your own SparkSession#
The Koheesio Spark module does not set up a SparkSession for you. You need to create a SparkSession before using Koheesio spark classes. This is the entry point for any Spark functionality, allowing the step to interact with the Spark cluster.
- Every
SparkStephas asparkattribute, which is the active SparkSession. - Koheesio supports both local and remote (connect) Spark Sessions
- The SparkSession you created can be explicitly passed to the
SparkStepconstructor (this is optional)
To create a simple SparkSession, you can use the following code:
Creating a Custom Step#
This example demonstrates how to use the SparkStep class from the koheesio library to create a custom step named
HelloWorldStep.
Code#
from koheesio.spark import SparkStep
class HelloWorldStep(SparkStep):
message: str
def execute(self) -> SparkStep.Output:
# create a DataFrame with a single row containing the message
self.output.df = self.spark.createDataFrame([(1, self.message)], ["id", "message"])
Usage#
hello_world_step = HelloWorldStep(message="Hello, World!", spark=spark) # optionally pass the spark session
hello_world_step.execute()
hello_world_step.output.df.show()
Understanding the Code#
The HelloWorldStep class is a SparkStep in Koheesio, designed to generate a DataFrame with a single row containing a custom message. Here's a more detailed overview:
HelloWorldStepinherits fromSparkStep, a fundamental building block in Koheesio for creating data processing steps with Apache Spark.- It has a
messageattribute. When creating an instance ofHelloWorldStep, you can pass a custom message that will be used in the DataFrame. SparkStepalso includes anOutputclass, used to store the output of the step. In this case,Outputhas adfattribute to store the output DataFrame.- The
executemethod creates a DataFrame with the custom message and stores it inoutput.df. It doesn't return a value explicitly; instead, the output DataFrame can be accessed viaoutput.df. - Koheesio uses pydantic for automatic validation of the step's input and output, ensuring they are correctly defined and of the correct types.
- The
sparkattribute can be optionally passed to the constructor when creating an instance ofHelloWorldStep. This allows you to use an existing SparkSession or create a new one specifically for the step. - If no
SparkSessionis passed to aSparkStep, Koheesio will use theSparkSession.getActiveSession()method to attempt retrieving an active SparkSession. If no active session is found, your code will not work.
Note: Pydantic is a data validation library that provides a way to validate that the data (in this case, the input and output of the step) conforms to the expected format.
Creating a Custom Task#
This example demonstrates how to use the EtlTask from the koheesio library to create a custom task named MyFavoriteMovieTask.
Code#
from typing import Any
from pyspark.sql import functions as f
from koheesio.spark import DataFrame
from koheesio.spark.transformations.transform import Transform
from koheesio.spark.etl_task import EtlTask
def add_column(df: DataFrame, target_column: str, value: Any):
return df.withColumn(target_column, f.lit(value))
class MyFavoriteMovieTask(EtlTask):
my_favorite_movie: str
def transform(self, df: Optional[DataFrame] = None) -> DataFrame:
df = df or self.extract()
# pre-transformations specific to this class
pre_transformations = [
Transform(add_column, target_column="myFavoriteMovie", value=self.my_favorite_movie)
]
# execute transformations one by one
for t in pre_transformations:
df = t.transform(df)
self.output.transform_df = df
return df
Configuration#
Here is the sample.yaml configuration file used in this example:
raw_layer:
catalog: development
schema: my_favorite_team
table: some_random_table
movies:
favorite: Office Space
hash_settings:
source_columns:
- id
- foo
target_column: hash_uuid5
source:
range: 4
Usage#
from pyspark.sql import SparkSession
from koheesio.context import Context
from koheesio.spark.readers.dummy import DummyReader
from koheesio.spark.writers.dummy import DummyWriter
context = Context.from_yaml("sample.yaml")
SparkSession.builder.getOrCreate()
my_fav_mov_task = MyFavoriteMovieTask(
source=DummyReader(**context.raw_layer),
target=DummyWriter(truncate=False),
my_favorite_movie=context.movies.favorite,
)
my_fav_mov_task.execute()
Understanding the Code#
This example creates a MyFavoriteMovieTask that adds a column named myFavoriteMovie to the DataFrame. The value for this column is provided when the task is instantiated.
The MyFavoriteMovieTask class is a custom task that extends the EtlTask from the koheesio library. It demonstrates how to add a custom transformation to a DataFrame. Here's a detailed breakdown:
-
MyFavoriteMovieTaskinherits fromEtlTask, a base class in Koheesio for creating Extract-Transform-Load (ETL) tasks with Apache Spark. -
It has a
my_favorite_movieattribute. When creating an instance ofMyFavoriteMovieTask, you can pass a custom movie title that will be used in the DataFrame. -
The
transformmethod is where the main logic of the task is implemented. It first extracts the data (if not already provided), then applies a series of transformations to the DataFrame. -
In this case, the transformation is adding a new column to the DataFrame named
myFavoriteMovie, with the value set to themy_favorite_movieattribute. This is done using theadd_columnfunction and theTransformclass from Koheesio. -
The transformed DataFrame is then stored in
self.output.transform_df. -
The
sample.yamlconfiguration file is used to provide the context for the task, including the source data and the favorite movie title. -
In the usage example, an instance of
MyFavoriteMovieTaskis created with aDummyReaderas the source, aDummyWriteras the target, and the favorite movie title from the context. The task is then executed, which runs the transformations and stores the result inself.output.transform_df.