Skip to content

Scd

This module defines writers to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.

Slowly Changing Dimension (SCD) is a technique used in data warehousing to handle changes to dimension data over time. SCD Type 2 is one of the most common types of SCD, where historical changes are tracked by creating new records for each change.

Koheesio is a powerful data processing framework that provides advanced capabilities for working with Delta tables in Apache Spark. It offers a convenient and efficient way to handle SCD Type 2 operations on Delta tables.

To learn more about Slowly Changing Dimension and SCD Type 2, you can refer to the following resources: - Slowly Changing Dimension (SCD) - Wikipedia

By using Koheesio, you can benefit from its efficient merge logic, support for SCD Type 2 and SCD Type 1 attributes, and seamless integration with Delta tables in Spark.

koheesio.spark.writers.delta.scd.SCD2DeltaTableWriter #

A class used to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.

Attributes:

Name Type Description
table InstanceOf[DeltaTableStep]

The table to merge to.

merge_key str

The key used for merging data.

include_columns List[str]

Columns to be merged. Will be selected from DataFrame. Default is all columns.

exclude_columns List[str]

Columns to be excluded from DataFrame.

scd2_columns List[str]

List of attributes for SCD2 type (track changes).

scd2_timestamp_col Optional[Column]

Timestamp column for SCD2 type (track changes). Default to current_timestamp.

scd1_columns List[str]

List of attributes for SCD1 type (just update).

meta_scd2_struct_col_name str

SCD2 struct name.

meta_scd2_effective_time_col_name str

Effective col name.

meta_scd2_is_current_col_name str

Current col name.

meta_scd2_end_time_col_name str

End time col name.

target_auto_generated_columns List[str]

Auto generated columns from target Delta table. Will be used to exclude from merge logic.

exclude_columns class-attribute instance-attribute #

exclude_columns: List[str] = Field(
    default_factory=list,
    description="Columns to be excluded from DataFrame",
)

include_columns class-attribute instance-attribute #

include_columns: List[str] = Field(
    default_factory=list,
    description="Columns to be merged. Will be selected from DataFrame.Default is all columns",
)

merge_key class-attribute instance-attribute #

merge_key: str = Field(..., description='Merge key')

meta_scd2_effective_time_col_name class-attribute instance-attribute #

meta_scd2_effective_time_col_name: str = Field(
    default="effective_time",
    description="Effective col name",
)

meta_scd2_end_time_col_name class-attribute instance-attribute #

meta_scd2_end_time_col_name: str = Field(
    default="end_time", description="End time col name"
)

meta_scd2_is_current_col_name class-attribute instance-attribute #

meta_scd2_is_current_col_name: str = Field(
    default="is_current", description="Current col name"
)

meta_scd2_struct_col_name class-attribute instance-attribute #

meta_scd2_struct_col_name: str = Field(
    default="_scd2", description="SCD2 struct name"
)

scd1_columns class-attribute instance-attribute #

scd1_columns: List[str] = Field(
    default_factory=list,
    description="List of attributes for scd1 type (just update)",
)

scd2_columns class-attribute instance-attribute #

scd2_columns: List[str] = Field(
    default_factory=list,
    description="List of attributes for scd2 type (track changes)",
)

scd2_timestamp_col class-attribute instance-attribute #

scd2_timestamp_col: Optional[Column] = Field(
    default=None,
    description="Timestamp column for SCD2 type (track changes). Default to current_timestamp",
)

table class-attribute instance-attribute #

table: InstanceOf[DeltaTableStep] = Field(
    ..., description="The table to merge to"
)

target_auto_generated_columns class-attribute instance-attribute #

target_auto_generated_columns: List[str] = Field(
    default_factory=list,
    description="Auto generated columns from target Delta table. Will be used to exclude from merge logic",
)

execute #

execute() -> None

Execute the SCD Type 2 operation.

This method executes the SCD Type 2 operation on the DataFrame. It validates the existing Delta table, prepares the merge conditions, stages the data, and then performs the merge operation.

Raises:

Type Description
TypeError

If the scd2_timestamp_col is not of date or timestamp type. If the source DataFrame is missing any of the required merge columns.

Source code in src/koheesio/spark/writers/delta/scd.py
def execute(self) -> None:
    """
    Execute the SCD Type 2 operation.

    This method executes the SCD Type 2 operation on the DataFrame.
    It validates the existing Delta table, prepares the merge conditions, stages the data,
    and then performs the merge operation.

    Raises
    ------
    TypeError
        If the scd2_timestamp_col is not of date or timestamp type.
        If the source DataFrame is missing any of the required merge columns.

    """
    self.df: DataFrame
    self.spark: SparkSession
    delta_table = DeltaTable.forName(sparkSession=self.spark, tableOrViewName=self.table.table_name)
    src_alias, cross_alias, dest_alias = "src", "cross", "tgt"

    # Prepare required merge columns
    required_merge_columns = [self.merge_key]

    if self.scd2_columns:
        required_merge_columns += self.scd2_columns

    if self.scd1_columns:
        required_merge_columns += self.scd1_columns

    if not all(c in self.df.columns for c in required_merge_columns):
        missing_columns = [c for c in required_merge_columns if c not in self.df.columns]
        raise TypeError(f"The source DataFrame is missing the columns: {missing_columns!r}")

    # Check that required columns are present in the source DataFrame
    if self.scd2_timestamp_col is not None:
        timestamp_col_type = self.df.select(self.scd2_timestamp_col).schema.fields[0].dataType

        if not isinstance(timestamp_col_type, (DateType, TimestampType)):
            raise TypeError(
                f"The scd2_timestamp_col '{self.scd2_timestamp_col}' must be of date "
                f"or timestamp type.Current type is {timestamp_col_type}"
            )

    # Prepare columns to process
    include_columns = self.include_columns if self.include_columns else self.df.columns
    exclude_columns = self.exclude_columns
    columns_to_process = [c for c in include_columns if c not in exclude_columns]

    # Constructing column names for SCD2 attributes
    meta_scd2_is_current_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_is_current_col_name}"
    meta_scd2_effective_time_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_effective_time_col_name}"
    meta_scd2_end_time_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_end_time_col_name}"

    # Constructing system merge action logic
    system_merge_action = f"CASE WHEN tgt.{self.merge_key} is NULL THEN 'I' "

    if updates_attrs_scd2 := self._prepare_attr_clause(
        attrs=self.scd2_columns, src_alias=src_alias, dest_alias=dest_alias
    ):
        system_merge_action += f" WHEN {updates_attrs_scd2} THEN 'UC' "

    if updates_attrs_scd1 := self._prepare_attr_clause(
        attrs=self.scd1_columns, src_alias=src_alias, dest_alias=dest_alias
    ):
        system_merge_action += f" WHEN {updates_attrs_scd1} THEN 'U' "

    system_merge_action += " ELSE NULL END"

    # Prepare the staged DataFrame
    staged = (
        self.df.withColumn(
            "__meta_scd2_timestamp",
            self._scd2_timestamp(scd2_timestamp_col=self.scd2_timestamp_col, spark=self.spark),
        )
        .transform(
            func=self._prepare_staging,
            delta_table=delta_table,
            merge_action_logic=F.expr(system_merge_action),
            meta_scd2_is_current_col=meta_scd2_is_current_col,
            columns_to_process=columns_to_process,
            src_alias=src_alias,
            dest_alias=dest_alias,
            cross_alias=cross_alias,
        )
        .transform(
            func=self._preserve_existing_target_values,
            meta_scd2_struct_col_name=self.meta_scd2_struct_col_name,
            target_auto_generated_columns=self.target_auto_generated_columns,
            src_alias=src_alias,
            cross_alias=cross_alias,
            dest_alias=dest_alias,
            logger=self.log,
        )
        .withColumn("__meta_scd2_end_time", self._scd2_end_time(meta_scd2_end_time_col=meta_scd2_end_time_col))
        .withColumn("__meta_scd2_is_current", self._scd2_is_current())
        .withColumn(
            "__meta_scd2_effective_time",
            self._scd2_effective_time(meta_scd2_effective_time_col=meta_scd2_effective_time_col),
        )
        .transform(
            func=self._add_scd2_columns,
            meta_scd2_struct_col_name=self.meta_scd2_struct_col_name,
            meta_scd2_effective_time_col_name=self.meta_scd2_effective_time_col_name,
            meta_scd2_end_time_col_name=self.meta_scd2_end_time_col_name,
            meta_scd2_is_current_col_name=self.meta_scd2_is_current_col_name,
        )
    )

    self._prepare_merge_builder(
        delta_table=delta_table,
        dest_alias=dest_alias,
        staged=staged,
        merge_key=self.merge_key,
        columns_to_process=columns_to_process,
        meta_scd2_effective_time_col=meta_scd2_effective_time_col,
    ).execute()