Transformation Module#
The Transformation
module in Koheesio provides a set of classes for transforming data within a DataFrame. A
Transformation
is a type of SparkStep
that takes a DataFrame as input, applies a transformation, and returns a
DataFrame as output. The transformation logic is implemented in the execute
method of each Transformation
subclass.
What is a Transformation?#
A Transformation
is a subclass of SparkStep
that applies a transformation to a DataFrame and stores the result. The
transformation could be any operation that modifies the data or structure of the DataFrame, such as adding a new column,
filtering rows, or aggregating data.
Using Transformation
classes ensures that data is transformed in a consistent manner across different parts of your
pipeline. This can help avoid errors and make your code easier to understand and maintain.
API Reference#
See API Reference for a detailed description of the
Transformation
classes and their methods.
Types of Transformations#
There are three main types of transformations in Koheesio:
-
Transformation
: This is the base class for all transformations. It takes a DataFrame as input and returns a DataFrame as output. The transformation logic is implemented in theexecute
method. -
ColumnsTransformation
: This is an extendedTransformation
class with a preset validator for handling column(s) data. It standardizes the input for a single column or multiple columns. If more than one column is passed, the transformation will be run in a loop against all the given columns. -
ColumnsTransformationWithTarget
: This is an extendedColumnsTransformation
class with an additionaltarget_column
field. This field can be used to store the result of the transformation in a new column. If thetarget_column
is not provided, the result will be stored in the source column.
Each type of transformation has its own use cases and advantages. The right one to use depends on the specific requirements of your data pipeline.
How to Define a Transformation#
To define a Transformation
, you create a subclass of the Transformation
class and implement the execute
method.
The execute
method should take a DataFrame from self.input.df
, apply a transformation, and store the result in
self.output.df
.
Transformation
classes abstract away some of the details of transforming data, allowing you to focus on the logic of
your pipeline. This can make your code cleaner and easier to read.
Here's an example of a Transformation
:
class MyTransformation(Transformation):
def execute(self):
# get data from self.input.df
data = self.input.df
# apply transformation
transformed_data = apply_transformation(data)
# store result in self.output.df
self.output.df = transformed_data
In this example, MyTransformation
is a subclass of Transformation
that you've defined. The execute
method gets
the data from self.input.df
, applies a transformation called apply_transformation
(undefined in this example), and
stores the result in self.output.df
.
How to Define a ColumnsTransformation#
To define a ColumnsTransformation
, you create a subclass of the ColumnsTransformation
class and implement the
execute
method. The execute
method should apply a transformation to the specified columns of the DataFrame.
ColumnsTransformation
classes can be easily swapped out for different data transformations without changing the rest
of your pipeline. This can make your pipeline more flexible and easier to modify or extend.
Here's an example of a ColumnsTransformation
:
from pyspark.sql import functions as f
from koheesio.spark.transformations import ColumnsTransformation
class AddOne(ColumnsTransformation):
def execute(self):
for column in self.get_columns():
self.output.df = self.df.withColumn(column, f.col(column) + 1)
In this example, AddOne
is a subclass of ColumnsTransformation
that you've defined. The execute
method adds 1 to
each column in self.get_columns()
.
The ColumnsTransformation
class has a ColumnConfig
class that can be used to configure the behavior of the class.
This class has the following fields:
run_for_all_data_type
: Allows to run the transformation for all columns of a given type.limit_data_type
: Allows to limit the transformation to a specific data type.data_type_strict_mode
: Toggles strict mode for data type validation. Will only work iflimit_data_type
is set.
Note that data types need to be specified as a SparkDatatype
enum. Users should not have to interact with the
ColumnConfig
class directly.
How to Define a ColumnsTransformationWithTarget#
To define a ColumnsTransformationWithTarget
, you create a subclass of the ColumnsTransformationWithTarget
class and
implement the func
method. The func
method should return the transformation that will be applied to the column(s).
The execute
method, which is already preset, will use the get_columns_with_target
method to loop over all the
columns and apply this function to transform the DataFrame.
Here's an example of a ColumnsTransformationWithTarget
:
from pyspark.sql import Column
from koheesio.spark.transformations import ColumnsTransformationWithTarget
class AddOneWithTarget(ColumnsTransformationWithTarget):
def func(self, col: Column):
return col + 1
In this example, AddOneWithTarget
is a subclass of ColumnsTransformationWithTarget
that you've defined. The func
method adds 1 to the values of a given column.
The ColumnsTransformationWithTarget
class has an additional target_column
field. This field can be used to store the
result of the transformation in a new column. If the target_column
is not provided, the result will be stored in the
source column. If more than one column is passed, the target_column
will be used as a suffix. Leaving this blank will
result in the original columns being renamed.
The ColumnsTransformationWithTarget
class also has a get_columns_with_target
method. This method returns an iterator
of the columns and handles the target_column
as well.
Key Features of a Transformation#
-
Execute Method: The
Transformation
class provides anexecute
method to implement in your subclass. This method should take a DataFrame fromself.input.df
, apply a transformation, and store the result inself.output.df
.For
ColumnsTransformation
andColumnsTransformationWithTarget
, theexecute
method is already implemented in the base class. Instead of overridingexecute
, you implement afunc
method in your subclass. Thisfunc
method should return the transformation to be applied to each column. Theexecute
method will then apply this func to each column in a loop. -
DataFrame Property: The
Transformation
class provides adf
property as a shorthand for accessingself.input.df
. This property ensures that the data is ready to be transformed, even if theexecute
method hasn't been explicitly called. This is useful for 'early validation' of the input data. -
SparkSession: Every
Transformation
has aSparkSession
available asself.spark
. This is the currently activeSparkSession
, which can be used to perform distributed data processing tasks. -
Columns Property: The
ColumnsTransformation
andColumnsTransformationWithTarget
classes provide acolumns
property. This property standardizes the input for a single column or multiple columns. If more than one column is passed, the transformation will be run in a loop against all the given columns. -
Target Column Property: The
ColumnsTransformationWithTarget
class provides atarget_column
property. This field can be used to store the result of the transformation in a new column. If thetarget_column
is not provided, the result will be stored in the source column.
Examples of Transformation Classes in Koheesio#
Koheesio provides a variety of Transformation
subclasses for transforming data in different ways. Here are some
examples:
-
DataframeLookup
: This transformation joins two dataframes together based on a list of join mappings. It allows you to specify the join type and join hint, and it supports selecting specific target columns from the right dataframe.Here's an example of how to use the
DataframeLookup
transformation:from pyspark.sql import SparkSession from koheesio.spark.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType spark = SparkSession.builder.getOrCreate() left_df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"]) right_df = spark.createDataFrame([(1, "A"), (3, "C")], ["id", "value"]) lookup = DataframeLookup( df=left_df, other=right_df, on=JoinMapping(source_column="id", other_column="id"), targets=TargetColumn(target_column="value", target_column_alias="right_value"), how=JoinType.LEFT, ) output_df = lookup.execute().df
-
HashUUID5
: This transformation is a subclass ofTransformation
and provides an interface to generate a UUID5 hash for each row in the DataFrame. The hash is generated based on the values of the specified source columns.Here's an example of how to use the
HashUUID5
transformation:from pyspark.sql import SparkSession from koheesio.spark.transformations.uuid5 import HashUUID5 spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"]) hash_transform = HashUUID5( df=df, source_columns=["id", "value"], target_column="hash" ) output_df = hash_transform.execute().df
In this example, HashUUID5
is a subclass of Transformation
. After creating an instance of HashUUID5
, you call
the execute
method to apply the transformation. The execute
method generates a UUID5 hash for each row in the
DataFrame based on the values of the id
and value
columns and stores the result in a new column named hash
.
Benefits of using Koheesio Transformations#
Using a Koheesio Transformation
over plain Spark provides several benefits:
-
Consistency: By using
Transformation
classes, you ensure that data is transformed in a consistent manner across different parts of your pipeline. This can help avoid errors and make your code easier to understand and maintain. -
Abstraction:
Transformation
classes abstract away the details of transforming data, allowing you to focus on the logic of your pipeline. This can make your code cleaner and easier to read. -
Flexibility:
Transformation
classes can be easily swapped out for different data transformations without changing the rest of your pipeline. This can make your pipeline more flexible and easier to modify or extend. -
Early Input Validation: As a
Transformation
is a type ofSparkStep
, which in turn is aStep
and a type of PydanticBaseModel
, all inputs are validated when an instance of aTransformation
class is created. This early validation helps catch errors related to invalid input, such as an invalid column name, before the PySpark pipeline starts executing. This can help avoid unnecessary computation and make your data pipelines more robust and reliable. -
Ease of Testing:
Transformation
classes are designed to be easily testable. This can make it easier to write unit tests for your data pipeline, helping to ensure its correctness and reliability. -
Robustness: Koheesio has been extensively tested with hundreds of unit tests, ensuring that the
Transformation
classes work as expected under a wide range of conditions. This makes your data pipelines more robust and less likely to fail due to unexpected inputs or edge cases.
By using the concept of a Transformation
, you can create data pipelines that are simple, consistent, flexible,
efficient, and reliable.
Advanced Usage of Transformations#
Transformations can be combined and chained together to create complex data processing pipelines. Here's an example of how to chain transformations:
from pyspark.sql import SparkSession
from koheesio.spark.transformations.uuid5 import HashUUID5
from koheesio.spark.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Define two DataFrames
df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
df2 = spark.createDataFrame([(1, "C"), (3, "D")], ["id", "value"])
# Define the first transformation
lookup = DataframeLookup(
other=df2,
on=JoinMapping(source_column="id", other_column="id"),
targets=TargetColumn(target_column="value", target_column_alias="right_value"),
how=JoinType.LEFT,
)
# Apply the first transformation
output_df = lookup.transform(df1)
# Define the second transformation
hash_transform = HashUUID5(
source_columns=["id", "value", "right_value"],
target_column="hash"
)
# Apply the second transformation
output_df2 = hash_transform.transform(output_df)
In this example, DataframeLookup
is a subclass of ColumnsTransformation
and HashUUID5
is a subclass of
Transformation
. After creating instances of DataframeLookup
and HashUUID5
, you call the transform
method to
apply each transformation. The transform
method of DataframeLookup
performs a left join with df2
on the id
column and adds the value
column from df2
to the result DataFrame as right_value
. The transform
method of
HashUUID5
generates a UUID5 hash for each row in the DataFrame based on the values of the id
, value
, and
right_value
columns and stores the result in a new column named hash
.
Troubleshooting Transformations#
If you encounter an error when using a transformation, here are some steps you can take to troubleshoot:
-
Check the Input Data: Make sure the input DataFrame to the transformation is correct. You can use the
show
method of the DataFrame to print the first few rows of the DataFrame. -
Check the Transformation Parameters: Make sure the parameters passed to the transformation are correct. For example, if you're using a
DataframeLookup
, make sure the join mappings and target columns are correctly specified. -
Check the Transformation Logic: If the input data and parameters are correct, there might be an issue with the transformation logic. You can use PySpark's logging utilities to log intermediate results and debug the transformation logic.
-
Check the Output Data: If the transformation executes without errors but the output data is not as expected, you can use the
show
method of the DataFrame to print the first few rows of the output DataFrame. This can help you identify any issues with the transformation logic.
Conclusion#
The Transformation
module in Koheesio provides a powerful and flexible way to transform data in a DataFrame. By
using Transformation
classes, you can create data pipelines that are simple, consistent, flexible, efficient, and
reliable. Whether you're performing simple transformations like adding a new column, or complex transformations like
joining multiple DataFrames, the Transformation
module has you covered.