Transformations
This module contains the base classes for all transformations.
See class docstrings for more information.
References
For a comprehensive guide on the usage, examples, and additional features of Transformation classes, please refer to the reference/concepts/spark/transformations section of the Koheesio documentation.
Classes:
Name | Description |
---|---|
Transformation |
Base class for all transformations |
ColumnsTransformation |
Extended Transformation class with a preset validator for handling column(s) data |
ColumnsTransformationWithTarget |
Extended ColumnsTransformation class with an additional |
koheesio.spark.transformations.ColumnsTransformation #
Extended Transformation class with a preset validator for handling column(s) data with a standardized input for a single column or multiple columns.
Concept
A ColumnsTransformation is a Transformation with a standardized input for column or columns.
columns
are stored as a list- either a single string, or a list of strings can be passed to enter the
columns
column
andcolumns
are aliases to one another - internally the namecolumns
should be used though.
If more than one column is passed, the behavior of the Class changes this way:
- the transformation will be run in a loop against all the given columns
Configuring the ColumnsTransformation
The ColumnsTransformation class has a ColumnConfig class that can be used to configure the behavior of the class. Users should not have to interact with the ColumnConfig class directly.
This class has the following fields:
-
run_for_all_data_type
allows to run the transformation for all columns of a given type. -
limit_data_type
allows to limit the transformation to a specific data type. -
data_type_strict_mode
Toggles strict mode for data type validation. Will only work iflimit_data_type
is set.
Data types need to be specified as a SparkDatatype enum.
- See the docstrings of the ColumnConfig class for more information.
- See the SparkDatatype enum for a list of available data types.
Example
Implementing a transformation using the ColumnsTransformation
class:
from pyspark.sql import functions as f
from koheesio.steps.transformations import ColumnsTransformation
class AddOne(ColumnsTransformation):
def execute(self):
for column in self.get_columns():
self.output.df = self.df.withColumn(
column, f.col(column) + 1
)
In the above example, the execute
method is implemented to add 1 to the values of a given column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
columns
|
ListOfColumns
|
The column (or list of columns) to apply the transformation to. Alias: column |
required |
columns
class-attribute
instance-attribute
#
columns: ListOfColumns = Field(
default="",
alias="column",
description="The column (or list of columns) to apply the transformation to. Alias: column",
)
data_type_strict_mode_is_set
property
#
data_type_strict_mode_is_set: bool
Returns True if data_type_strict_mode is set
limit_data_type_is_set
property
#
limit_data_type_is_set: bool
Returns True if limit_data_type is set
run_for_all_is_set
property
#
run_for_all_is_set: bool
Returns True if the transformation should be run for all columns of a given type
ColumnConfig #
Koheesio ColumnsTransformation specific Config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_for_all_data_type
|
Optional[List[SparkDatatype]]
|
allows to run the transformation for all columns of a given type.
A user can trigger this behavior by either omitting the |
required |
limit_data_type
|
Optional[List[SparkDatatype]]
|
allows to limit the transformation to a specific data type. Value should be passed as a SparkDatatype enum. (default: [None]) |
required |
data_type_strict_mode
|
bool
|
Toggles strict mode for data type validation. Will only work if |
required |
limit_data_type
class-attribute
instance-attribute
#
limit_data_type: Optional[List[SparkDatatype]] = [None]
run_for_all_data_type
class-attribute
instance-attribute
#
run_for_all_data_type: Optional[List[SparkDatatype]] = [
None
]
column_type_of_col #
column_type_of_col(
col: Union[Column, str],
df: Optional[DataFrame] = None,
simple_return_mode: bool = True,
) -> Union[DataType, str]
Returns the dataType of a Column object as a string.
The Column object does not have a type attribute, so we have to ask the DataFrame its schema and find the type based on the column name. We retrieve the name of the column from the Column object by calling toString() from the JVM.
Examples:
input_df: | str_column | int_column | |------------|------------| | hello | 1 | | world | 2 |
# using the AddOne transformation from the example above
add_one = AddOne(
columns=["str_column", "int_column"],
df=input_df,
)
add_one.column_type_of_col("str_column") # returns "string"
add_one.column_type_of_col("int_column") # returns "integer"
# returns IntegerType
add_one.column_type_of_col("int_column", simple_return_mode=False)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
col
|
Union[str, Column]
|
The column to check the type of |
required |
df
|
Optional[DataFrame]
|
The DataFrame belonging to the column. If not provided, the DataFrame passed to the constructor will be used. |
None
|
simple_return_mode
|
bool
|
If True, the return value will be a simple string. If False, the return value will be a SparkDatatype enum. |
True
|
Returns:
Name | Type | Description |
---|---|---|
datatype |
str
|
The type of the column as a string |
Source code in src/koheesio/spark/transformations/__init__.py
get_all_columns_of_specific_type #
get_all_columns_of_specific_type(
data_type: Union[str, SparkDatatype]
) -> List[str]
Get all columns from the dataframe of a given type
A DataFrame needs to be available in order to get the columns. If no DataFrame is available, a ValueError will be raised.
Note: only one data type can be passed to this method. If you want to get columns of multiple data types, you have to call this method multiple times.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type
|
Union[str, SparkDatatype]
|
The data type to get the columns for |
required |
Returns:
Type | Description |
---|---|
List[str]
|
A list of column names of the given data type |
Source code in src/koheesio/spark/transformations/__init__.py
get_columns #
Return an iterator of the columns
Source code in src/koheesio/spark/transformations/__init__.py
is_column_type_correct #
Check if column type is correct and handle it if not, when limit_data_type is set
Source code in src/koheesio/spark/transformations/__init__.py
set_columns #
set_columns(columns_value: ListOfColumns) -> ListOfColumns
Validate columns through the columns configuration provided
Source code in src/koheesio/spark/transformations/__init__.py
koheesio.spark.transformations.ColumnsTransformationWithTarget #
Extended ColumnsTransformation class with an additional target_column
field
Using this class makes implementing Transformations significantly easier.
Concept
A ColumnsTransformationWithTarget
is a ColumnsTransformation
with an additional target_column
field. This
field can be used to store the result of the transformation in a new column.
If the target_column
is not provided, the result will be stored in the source column.
If more than one column is passed, the behavior of the Class changes this way:
- the transformation will be run in a loop against all the given columns
- automatically handles the renaming of the columns when more than one column is passed
- the
target_column
will be used as a suffix. Leaving this blank will result in the original columns being renamed
The func
method should be implemented in the child class. This method should return the transformation that will
be applied to the column(s). The execute method (already preset) will use the get_columns_with_target
method to
loop over all the columns and apply this function to transform the DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
columns
|
ListOfColumns
|
The column (or list of columns) to apply the transformation to. Alias: column. If not provided, the
|
*
|
target_column
|
Optional[str]
|
The name of the column to store the result in. If not provided, the result will be stored in the source column. Alias: target_suffix - if multiple columns are given as source, this input will be used as a suffix instead. |
None
|
Example
Writing your own transformation using the ColumnsTransformationWithTarget
class:
from pyspark.sql import Column
from koheesio.steps.transformations import (
ColumnsTransformationWithTarget,
)
class AddOneWithTarget(ColumnsTransformationWithTarget):
def func(self, col: Column):
return col + 1
In the above example, the func
method is implemented to add 1 to the values of a given column.
In order to use this transformation, we can call the transform
method:
from pyspark.sql import SparkSession
# create a DataFrame with 3 rows
df = SparkSession.builder.getOrCreate().range(3)
output_df = AddOneWithTarget(
column="id", target_column="new_id"
).transform(df)
The output_df
will now contain the original DataFrame with an additional column called new_id
with the values of
id
+ 1.
output_df:
id | new_id |
---|---|
0 | 1 |
1 | 2 |
2 | 3 |
Note: The
target_column
will be used as a suffix when more than one column is given as source. Leaving this blank will result in the original columns being renamed.
target_column
class-attribute
instance-attribute
#
target_column: Optional[str] = Field(
default=None,
alias="target_suffix",
description="The column to store the result in. If not provided, the result will be stored in the sourcecolumn. Alias: target_suffix - if multiple columns are given as source, this will be used as a suffix",
)
execute #
Execute on a ColumnsTransformationWithTarget handles self.df (input) and set self.output.df (output) This can be left unchanged, and hence should not be implemented in the child class.
Source code in src/koheesio/spark/transformations/__init__.py
func
abstractmethod
#
The function that will be run on a single Column of the DataFrame
The func
method should be implemented in the child class. This method should return the transformation that
will be applied to the column(s). The execute method (already preset) will use the get_columns_with_target
method to loop over all the columns and apply this function to transform the DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column
|
Column
|
The column to apply the transformation to |
required |
Returns:
Type | Description |
---|---|
Column
|
The transformed column |
Source code in src/koheesio/spark/transformations/__init__.py
get_columns_with_target #
Return an iterator of the columns
Works just like in get_columns from the ColumnsTransformation class except that it handles the target_column
as well.
If more than one column is passed, the behavior of the Class changes this way: - the transformation will be run in a loop against all the given columns - the target_column will be used as a suffix. Leaving this blank will result in the original columns being renamed.
Returns:
Type | Description |
---|---|
iter
|
An iterator of tuples containing the target column name and the original column name |
Source code in src/koheesio/spark/transformations/__init__.py
koheesio.spark.transformations.Transformation #
Base class for all transformations
Concept
A Transformation is a Step that takes a DataFrame as input and returns a DataFrame as output. The DataFrame is
transformed based on the logic implemented in the execute
method. Any additional parameters that are needed for
the transformation can be passed to the constructor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
Optional[DataFrame]
|
The DataFrame to apply the transformation to. If not provided, the DataFrame has to be passed to the transform-method. |
required |
Example
Implementing a transformation using the Transformation class:#
from koheesio.steps.transformations import Transformation
from pyspark.sql import functions as f
class AddOne(Transformation):
target_column: str = "new_column"
def execute(self):
self.output.df = self.df.withColumn(
self.target_column, f.col("old_column") + 1
)
In the example above, the execute
method is implemented to add 1 to the values of the old_column
and store the
result in a new column called new_column
.
Using the transformation:#
In order to use this transformation, we can call the transform
method:
from pyspark.sql import SparkSession
# create a DataFrame with 3 rows
df = SparkSession.builder.getOrCreate().range(3)
output_df = AddOne().transform(df)
The output_df
will now contain the original DataFrame with an additional column called new_column
with the
values of old_column
+ 1.
output_df:
id | new_column |
---|---|
0 | 1 |
1 | 2 |
2 | 3 |
... |
Alternative ways to use the transformation:#
Alternatively, we can pass the DataFrame to the constructor and call the execute
or transform
method without
any arguments:
Note: that the transform method was not implemented explicitly in the AddOne class. This is because the
transform
method is already implemented in theTransformation
class. This means that all classes that inherit from the Transformation class will have thetransform
method available. Only the execute method needs to be implemented.
Using the transformation as a function:#
The transformation can also be used as a function as part of a DataFrame's transform
method:
input_df = spark.range(3)
output_df = input_df.transform(AddOne(target_column="foo")).transform(
AddOne(target_column="bar")
)
In the above example, the AddOne
transformation is applied to the input_df
DataFrame using the transform
method. The output_df
will now contain the original DataFrame with an additional columns called foo
and
bar', each with the values of
id` + 1.
df
class-attribute
instance-attribute
#
df: Optional[DataFrame] = Field(
default=None, description="The Spark DataFrame"
)
execute
abstractmethod
#
execute() -> Output
Execute on a Transformation should handle self.df (input) and set self.output.df (output)
This method should be implemented in the child class. The input DataFrame is available as self.df
and the
output DataFrame should be stored in self.output.df
.
For example:
The transform method will call this method and return the output DataFrame.
Source code in src/koheesio/spark/transformations/__init__.py
transform #
transform(df: Optional[DataFrame] = None) -> DataFrame
Execute the transformation and return the output DataFrame
Note: when creating a child from this, don't implement this transform method. Instead, implement execute!
See Also
Transformation.execute
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
Optional[DataFrame]
|
The DataFrame to apply the transformation to. If not provided, the DataFrame passed to the constructor will be used. |
None
|
Returns:
Type | Description |
---|---|
DataFrame
|
The transformed DataFrame |