Skip to content

Lookup

Lookup transformation for joining two dataframes together

Classes:

Name Description
JoinMapping
TargetColumn
JoinType
JoinHint
DataframeLookup

koheesio.spark.transformations.lookup.DataframeLookup #

Lookup transformation for joining two dataframes together

Parameters:

Name Type Description Default
df DataFrame

The left Spark DataFrame

required
other DataFrame

The right Spark DataFrame

required
on List[JoinMapping] | JoinMapping

List of join mappings. If only one mapping is passed, it can be passed as a single object.

required
targets List[TargetColumn] | TargetColumn

List of target columns. If only one target is passed, it can be passed as a single object.

required
how JoinType

What type of join to perform. Defaults to left. See JoinType for more information.

required
hint JoinHint

What type of join hint to use. Defaults to None. See JoinHint for more information.

required
Example
from pyspark.sql import SparkSession
from koheesio.spark.transformations.lookup import (
    DataframeLookup,
    JoinMapping,
    TargetColumn,
    JoinType,
)

spark = SparkSession.builder.getOrCreate()

# create the dataframes
left_df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
right_df = spark.createDataFrame([(1, "A"), (3, "C")], ["id", "value"])

# perform the lookup
lookup = DataframeLookup(
    df=left_df,
    other=right_df,
    on=JoinMapping(source_column="id", joined_column="id"),
    targets=TargetColumn(
        target_column="value", target_column_alias="right_value"
    ),
    how=JoinType.LEFT,
)

output_df = lookup.transform()

output_df:

id value right_value
1 A A
2 B null

In this example, the left_df and right_df dataframes are joined together using the id column. The value column from the right_df is aliased as right_value in the output dataframe.

df class-attribute instance-attribute #

df: DataFrame = Field(
    default=None, description="The left Spark DataFrame"
)

hint class-attribute instance-attribute #

hint: Optional[JoinHint] = Field(
    default=None,
    description="What type of join hint to use. Defaults to None. "
    + __doc__,
)

how class-attribute instance-attribute #

how: Optional[JoinType] = Field(
    default=LEFT,
    description="What type of join to perform. Defaults to left. "
    + __doc__,
)

on class-attribute instance-attribute #

on: Union[List[JoinMapping], JoinMapping] = Field(
    default=...,
    alias="join_mapping",
    description="List of join mappings. If only one mapping is passed, it can be passed as a single object.",
)

other class-attribute instance-attribute #

other: DataFrame = Field(
    default=None, description="The right Spark DataFrame"
)

targets class-attribute instance-attribute #

targets: Union[List[TargetColumn], TargetColumn] = Field(
    default=...,
    alias="target_columns",
    description="List of target columns. If only one target is passed, it can be passed as a single object.",
)

Output #

Output for the lookup transformation

left_df class-attribute instance-attribute #

left_df: DataFrame = Field(
    default=..., description="The left Spark DataFrame"
)

right_df class-attribute instance-attribute #

right_df: DataFrame = Field(
    default=..., description="The right Spark DataFrame"
)

execute #

execute() -> Output

Execute the lookup transformation

Source code in src/koheesio/spark/transformations/lookup.py
def execute(self) -> Output:
    """Execute the lookup transformation"""
    # prepare the right dataframe
    prepared_right_df = self.get_right_df().select(
        *[join_mapping.column for join_mapping in self.on],
        *[target.column for target in self.targets],
    )
    if self.hint:
        prepared_right_df = prepared_right_df.hint(self.hint)

    # generate the output
    self.output.left_df = self.df
    self.output.right_df = prepared_right_df
    self.output.df = self.df.join(
        prepared_right_df,
        on=[join_mapping.source_column for join_mapping in self.on],
        how=self.how,
    )

get_right_df #

get_right_df() -> DataFrame

Get the right side dataframe

Source code in src/koheesio/spark/transformations/lookup.py
def get_right_df(self) -> DataFrame:
    """Get the right side dataframe"""
    return self.other

set_list #

set_list(value)

Ensure that we can pass either a single object, or a list of objects

Source code in src/koheesio/spark/transformations/lookup.py
@field_validator("on", "targets")
def set_list(cls, value):
    """Ensure that we can pass either a single object, or a list of objects"""
    return [value] if not isinstance(value, list) else value

koheesio.spark.transformations.lookup.JoinHint #

Supported join hints

BROADCAST class-attribute instance-attribute #

BROADCAST = 'broadcast'

MERGE class-attribute instance-attribute #

MERGE = 'merge'

koheesio.spark.transformations.lookup.JoinMapping #

Mapping for joining two dataframes together

column property #

column: Column

Get the join mapping as a pyspark.sql.Column object

other_column instance-attribute #

other_column: str

source_column instance-attribute #

source_column: str

koheesio.spark.transformations.lookup.JoinType #

Supported join types

ANTI class-attribute instance-attribute #

ANTI = 'anti'

CROSS class-attribute instance-attribute #

CROSS = 'cross'

FULL class-attribute instance-attribute #

FULL = 'full'

INNER class-attribute instance-attribute #

INNER = 'inner'

LEFT class-attribute instance-attribute #

LEFT = 'left'

RIGHT class-attribute instance-attribute #

RIGHT = 'right'

SEMI class-attribute instance-attribute #

SEMI = 'semi'

koheesio.spark.transformations.lookup.TargetColumn #

Target column for the joined dataframe

column property #

column: Column

Get the target column as a pyspark.sql.Column object

target_column instance-attribute #

target_column: str

target_column_alias instance-attribute #

target_column_alias: str