Skip to content

Delta

This module is the entry point for the koheesio.spark.writers.delta package.

It imports and exposes the DeltaTableWriter and DeltaTableStreamWriter classes for external use.

Classes: DeltaTableWriter: Class to write data in batch mode to a Delta table. DeltaTableStreamWriter: Class to write data in streaming mode to a Delta table.

koheesio.spark.writers.delta.BatchOutputMode #

For Batch:

  • append: Append the contents of the DataFrame to the output table, default option in Koheesio.
  • overwrite: overwrite the existing data.
  • ignore: ignore the operation (i.e. no-op).
  • error or errorifexists: throw an exception at runtime.
  • merge: update matching data in the table and insert rows that do not exist.
  • merge_all: update matching data in the table and insert rows that do not exist.

APPEND class-attribute instance-attribute #

APPEND = 'append'

ERROR class-attribute instance-attribute #

ERROR = 'error'

ERRORIFEXISTS class-attribute instance-attribute #

ERRORIFEXISTS = 'error'

IGNORE class-attribute instance-attribute #

IGNORE = 'ignore'

MERGE class-attribute instance-attribute #

MERGE = 'merge'

MERGEALL class-attribute instance-attribute #

MERGEALL = 'merge_all'

MERGE_ALL class-attribute instance-attribute #

MERGE_ALL = 'merge_all'

OVERWRITE class-attribute instance-attribute #

OVERWRITE = 'overwrite'

koheesio.spark.writers.delta.DeltaTableStreamWriter #

Delta table stream writer

Options #

Options for DeltaTableStreamWriter

allow_population_by_field_name class-attribute instance-attribute #

allow_population_by_field_name: bool = Field(
    default=True,
    description=" To do convert to Field and pass as .options(**config)",
)

maxBytesPerTrigger class-attribute instance-attribute #

maxBytesPerTrigger: Optional[str] = Field(
    default=None,
    description="How much data to be processed per trigger. The default is 1GB",
)

maxFilesPerTrigger class-attribute instance-attribute #

maxFilesPerTrigger: int = Field(
    default == 1000,
    description="The maximum number of new files to be considered in every trigger (default: 1000).",
)

execute #

execute() -> Output
Source code in src/koheesio/spark/writers/delta/stream.py
def execute(self) -> DeltaTableWriter.Output:
    if self.batch_function:
        self.streaming_query = self.writer.start()
    else:
        self.streaming_query = self.writer.toTable(tableName=self.table.table_name)

koheesio.spark.writers.delta.DeltaTableWriter #

Delta table Writer for both batch and streaming dataframes.

Example
Example for MERGEALL#
DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGEALL,
    output_mode_params={
        "merge_cond": "target.id=source.id",
        "update_cond": "target.col1_val>=source.col1_val",
        "insert_cond": "source.col_bk IS NOT NULL",
        "target_alias": "target",  # <------ DEFAULT, can be changed by providing custom value
        "source_alias": "source",  # <------ DEFAULT, can be changed by providing custom value
    },
)
Example for MERGE#
DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGE,
    output_mode_params={
        'merge_builder': (
            DeltaTable
            .forName(sparkSession=spark, tableOrViewName=<target_table_name>)
            .alias(target_alias)
            .merge(source=df, condition=merge_cond)
            .whenMatchedUpdateAll(condition=update_cond)
            .whenNotMatchedInsertAll(condition=insert_cond)
            )
        }
    )
Example for MERGE#

in case the table isn't created yet, first run will execute an APPEND operation

DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGE,
    output_mode_params={
        "merge_builder": [
            {
                "clause": "whenMatchedUpdate",
                "set": {"value": "source.value"},
                "condition": "<update_condition>",
            },
            {
                "clause": "whenNotMatchedInsert",
                "values": {"id": "source.id", "value": "source.value"},
                "condition": "<insert_condition>",
            },
        ],
        "merge_cond": "<merge_condition>",
    },
)

Example for APPEND#

dataframe writer options can be passed as keyword arguments

DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.APPEND,
    partitionOverwriteMode="dynamic",
    mergeSchema="false",
)

Parameters:

Name Type Description Default
table Union[DeltaTableStep, str]

The table to write to

required
output_mode Optional[Union[str, BatchOutputMode, StreamingOutputMode]]

The output mode to use. Default is BatchOutputMode.APPEND. For streaming, use StreamingOutputMode.

required
params Optional[dict]

Additional parameters to use for specific mode

required

format class-attribute instance-attribute #

format: str = 'delta'

output_mode class-attribute instance-attribute #

output_mode: Optional[
    Union[BatchOutputMode, StreamingOutputMode]
] = Field(
    default=APPEND,
    alias="outputMode",
    description=f"{__doc__}
{__doc__}",
)

params class-attribute instance-attribute #

params: dict = Field(
    default_factory=dict,
    alias="output_mode_params",
    description="Additional parameters to use for specific mode",
)

partition_by class-attribute instance-attribute #

partition_by: Optional[List[str]] = Field(
    default=None,
    alias="partitionBy",
    description="The list of fields to partition the Delta table on",
)

table class-attribute instance-attribute #

table: Union[DeltaTableStep, str] = Field(
    default=..., description="The table to write to"
)

writer property #

writer: Union[DeltaMergeBuilder, DataFrameWriter]

Specify DeltaTableWriter

execute #

execute() -> Output
Source code in src/koheesio/spark/writers/delta/batch.py
def execute(self) -> Writer.Output:
    _writer = self.writer

    if self.table.create_if_not_exists and not self.table.exists:
        _writer = _writer.options(**self.table.default_create_properties)

    if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder":
        _writer.execute()
    else:
        if options := self.params:
            # should we add options only if mode is not merge?
            _writer = _writer.options(**options)
        _writer.saveAsTable(self.table.table_name)

get_output_mode classmethod #

get_output_mode(
    choice: str, options: Set[Type]
) -> Union[BatchOutputMode, StreamingOutputMode]

Retrieve an OutputMode by validating choice against a set of option OutputModes.

Currently supported output modes can be found in:

  • BatchOutputMode
  • StreamingOutputMode
Source code in src/koheesio/spark/writers/delta/batch.py
@classmethod
def get_output_mode(cls, choice: str, options: Set[Type]) -> Union[BatchOutputMode, StreamingOutputMode]:
    """Retrieve an OutputMode by validating `choice` against a set of option OutputModes.

    Currently supported output modes can be found in:

    - BatchOutputMode
    - StreamingOutputMode
    """
    from koheesio.spark.utils.connect import is_remote_session

    if (
        choice.upper() in (BatchOutputMode.MERGEALL, BatchOutputMode.MERGE_ALL, BatchOutputMode.MERGE)
        and is_remote_session()
    ):
        raise RuntimeError(f"Output mode {choice.upper()} is not supported in remote mode")

    for enum_type in options:
        if choice.upper() in [om.value.upper() for om in enum_type]:  # type: ignore
            return getattr(enum_type, choice.upper())
    raise AttributeError(
        f"""
        Invalid outputMode specified '{choice}'. Allowed values are:
        Batch Mode - {BatchOutputMode.__doc__}
        Streaming Mode - {StreamingOutputMode.__doc__}
        """
    )

koheesio.spark.writers.delta.SCD2DeltaTableWriter #

A class used to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.

Attributes:

Name Type Description
table InstanceOf[DeltaTableStep]

The table to merge to.

merge_key str

The key used for merging data.

include_columns List[str]

Columns to be merged. Will be selected from DataFrame. Default is all columns.

exclude_columns List[str]

Columns to be excluded from DataFrame.

scd2_columns List[str]

List of attributes for SCD2 type (track changes).

scd2_timestamp_col Optional[Column]

Timestamp column for SCD2 type (track changes). Default to current_timestamp.

scd1_columns List[str]

List of attributes for SCD1 type (just update).

meta_scd2_struct_col_name str

SCD2 struct name.

meta_scd2_effective_time_col_name str

Effective col name.

meta_scd2_is_current_col_name str

Current col name.

meta_scd2_end_time_col_name str

End time col name.

target_auto_generated_columns List[str]

Auto generated columns from target Delta table. Will be used to exclude from merge logic.

exclude_columns class-attribute instance-attribute #

exclude_columns: List[str] = Field(
    default_factory=list,
    description="Columns to be excluded from DataFrame",
)

include_columns class-attribute instance-attribute #

include_columns: List[str] = Field(
    default_factory=list,
    description="Columns to be merged. Will be selected from DataFrame.Default is all columns",
)

merge_key class-attribute instance-attribute #

merge_key: str = Field(..., description='Merge key')

meta_scd2_effective_time_col_name class-attribute instance-attribute #

meta_scd2_effective_time_col_name: str = Field(
    default="effective_time",
    description="Effective col name",
)

meta_scd2_end_time_col_name class-attribute instance-attribute #

meta_scd2_end_time_col_name: str = Field(
    default="end_time", description="End time col name"
)

meta_scd2_is_current_col_name class-attribute instance-attribute #

meta_scd2_is_current_col_name: str = Field(
    default="is_current", description="Current col name"
)

meta_scd2_struct_col_name class-attribute instance-attribute #

meta_scd2_struct_col_name: str = Field(
    default="_scd2", description="SCD2 struct name"
)

scd1_columns class-attribute instance-attribute #

scd1_columns: List[str] = Field(
    default_factory=list,
    description="List of attributes for scd1 type (just update)",
)

scd2_columns class-attribute instance-attribute #

scd2_columns: List[str] = Field(
    default_factory=list,
    description="List of attributes for scd2 type (track changes)",
)

scd2_timestamp_col class-attribute instance-attribute #

scd2_timestamp_col: Column = Field(
    default=None,
    description="Timestamp column for SCD2 type (track changes). Default to current_timestamp",
)

table class-attribute instance-attribute #

table: InstanceOf[DeltaTableStep] = Field(
    ..., description="The table to merge to"
)

target_auto_generated_columns class-attribute instance-attribute #

target_auto_generated_columns: List[str] = Field(
    default_factory=list,
    description="Auto generated columns from target Delta table. Will be used to exclude from merge logic",
)

execute #

execute() -> None

Execute the SCD Type 2 operation.

This method executes the SCD Type 2 operation on the DataFrame. It validates the existing Delta table, prepares the merge conditions, stages the data, and then performs the merge operation.

Raises:

Type Description
TypeError

If the scd2_timestamp_col is not of date or timestamp type. If the source DataFrame is missing any of the required merge columns.

Source code in src/koheesio/spark/writers/delta/scd.py
def execute(self) -> None:
    """
    Execute the SCD Type 2 operation.

    This method executes the SCD Type 2 operation on the DataFrame.
    It validates the existing Delta table, prepares the merge conditions, stages the data,
    and then performs the merge operation.

    Raises
    ------
    TypeError
        If the scd2_timestamp_col is not of date or timestamp type.
        If the source DataFrame is missing any of the required merge columns.

    """
    self.df: DataFrame
    self.spark: SparkSession
    delta_table = get_delta_table_for_name(spark_session=self.spark, table_name=self.table.table_name)
    src_alias, cross_alias, dest_alias = "src", "cross", "tgt"

    # Prepare required merge columns
    required_merge_columns = [self.merge_key]

    if self.scd2_columns:
        required_merge_columns += self.scd2_columns

    if self.scd1_columns:
        required_merge_columns += self.scd1_columns

    if not all(c in self.df.columns for c in required_merge_columns):
        missing_columns = [c for c in required_merge_columns if c not in self.df.columns]
        raise TypeError(f"The source DataFrame is missing the columns: {missing_columns!r}")

    # Check that required columns are present in the source DataFrame
    if self.scd2_timestamp_col is not None:
        timestamp_col_type = self.df.select(self.scd2_timestamp_col).schema.fields[0].dataType

        if not isinstance(timestamp_col_type, (DateType, TimestampType)):
            raise TypeError(
                f"The scd2_timestamp_col '{self.scd2_timestamp_col}' must be of date "
                f"or timestamp type.Current type is {timestamp_col_type}"
            )

    # Prepare columns to process
    include_columns = self.include_columns if self.include_columns else self.df.columns
    exclude_columns = self.exclude_columns
    columns_to_process = [c for c in include_columns if c not in exclude_columns]

    # Constructing column names for SCD2 attributes
    meta_scd2_is_current_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_is_current_col_name}"
    meta_scd2_effective_time_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_effective_time_col_name}"
    meta_scd2_end_time_col = f"{self.meta_scd2_struct_col_name}.{self.meta_scd2_end_time_col_name}"

    # Constructing system merge action logic
    system_merge_action = f"CASE WHEN tgt.{self.merge_key} is NULL THEN 'I' "

    if updates_attrs_scd2 := self._prepare_attr_clause(
        attrs=self.scd2_columns, src_alias=src_alias, dest_alias=dest_alias
    ):
        system_merge_action += f" WHEN {updates_attrs_scd2} THEN 'UC' "

    if updates_attrs_scd1 := self._prepare_attr_clause(
        attrs=self.scd1_columns, src_alias=src_alias, dest_alias=dest_alias
    ):
        system_merge_action += f" WHEN {updates_attrs_scd1} THEN 'U' "

    system_merge_action += " ELSE NULL END"

    # Prepare the staged DataFrame
    staged = (
        self.df.withColumn(
            "__meta_scd2_timestamp",
            self._scd2_timestamp(scd2_timestamp_col=self.scd2_timestamp_col, spark=self.spark),
        )
        .transform(
            func=self._prepare_staging,
            delta_table=delta_table,
            merge_action_logic=f.expr(system_merge_action),
            meta_scd2_is_current_col=meta_scd2_is_current_col,
            columns_to_process=columns_to_process,
            src_alias=src_alias,
            dest_alias=dest_alias,
            cross_alias=cross_alias,
        )
        .transform(
            func=self._preserve_existing_target_values,
            meta_scd2_struct_col_name=self.meta_scd2_struct_col_name,
            target_auto_generated_columns=self.target_auto_generated_columns,
            src_alias=src_alias,
            cross_alias=cross_alias,
            dest_alias=dest_alias,
            logger=self.log,
        )
        .withColumn("__meta_scd2_end_time", self._scd2_end_time(meta_scd2_end_time_col=meta_scd2_end_time_col))
        .withColumn("__meta_scd2_is_current", self._scd2_is_current())
        .withColumn(
            "__meta_scd2_effective_time",
            self._scd2_effective_time(meta_scd2_effective_time_col=meta_scd2_effective_time_col),
        )
        .transform(
            func=self._add_scd2_columns,
            meta_scd2_struct_col_name=self.meta_scd2_struct_col_name,
            meta_scd2_effective_time_col_name=self.meta_scd2_effective_time_col_name,
            meta_scd2_end_time_col_name=self.meta_scd2_end_time_col_name,
            meta_scd2_is_current_col_name=self.meta_scd2_is_current_col_name,
        )
    )

    self._prepare_merge_builder(
        delta_table=delta_table,
        dest_alias=dest_alias,
        staged=staged,
        merge_key=self.merge_key,
        columns_to_process=columns_to_process,
        meta_scd2_effective_time_col=meta_scd2_effective_time_col,
    ).execute()