Skip to content

Batch

This module defines the DeltaTableWriter class, which is used to write both batch and streaming dataframes to Delta tables.

DeltaTableWriter supports two output modes: MERGEALL and MERGE.

  • The MERGEALL mode merges all incoming data with existing data in the table based on certain conditions.
  • The MERGE mode allows for more custom merging behavior using the DeltaMergeBuilder class from the delta.tables library.

The output_mode_params dictionary is used to specify conditions for merging, updating, and inserting data. The target_alias and source_alias keys are used to specify the aliases for the target and source dataframes in the merge conditions.

Classes:

Name Description
DeltaTableWriter

A class for writing data to Delta tables.

DeltaTableStreamWriter

A class for writing streaming data to Delta tables.

Example
DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGEALL,
    output_mode_params={
        "merge_cond": "target.id=source.id",
        "update_cond": "target.col1_val>=source.col1_val",
        "insert_cond": "source.col_bk IS NOT NULL",
    },
)

koheesio.spark.writers.delta.batch.DeltaTableWriter #

Delta table Writer for both batch and streaming dataframes.

Example
Example for MERGEALL#
DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGEALL,
    output_mode_params={
        "merge_cond": "target.id=source.id",
        "update_cond": "target.col1_val>=source.col1_val",
        "insert_cond": "source.col_bk IS NOT NULL",
        "target_alias": "target",  # <------ DEFAULT, can be changed by providing custom value
        "source_alias": "source",  # <------ DEFAULT, can be changed by providing custom value
    },
)
Example for MERGE#
DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGE,
    output_mode_params={
        'merge_builder': (
            DeltaTable
            .forName(sparkSession=spark, tableOrViewName=<target_table_name>)
            .alias(target_alias)
            .merge(source=df, condition=merge_cond)
            .whenMatchedUpdateAll(condition=update_cond)
            .whenNotMatchedInsertAll(condition=insert_cond)
            )
        }
    )
Example for MERGE#

in case the table isn't created yet, first run will execute an APPEND operation

DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.MERGE,
    output_mode_params={
        "merge_builder": [
            {
                "clause": "whenMatchedUpdate",
                "set": {"value": "source.value"},
                "condition": "<update_condition>",
            },
            {
                "clause": "whenNotMatchedInsert",
                "values": {"id": "source.id", "value": "source.value"},
                "condition": "<insert_condition>",
            },
        ],
        "merge_cond": "<merge_condition>",
    },
)

Example for APPEND#

dataframe writer options can be passed as keyword arguments

DeltaTableWriter(
    table="test_table",
    output_mode=BatchOutputMode.APPEND,
    partitionOverwriteMode="dynamic",
    mergeSchema="false",
)

Parameters:

Name Type Description Default
table Union[DeltaTableStep, str]

The table to write to

required
output_mode Optional[Union[str, BatchOutputMode, StreamingOutputMode]]

The output mode to use. Default is BatchOutputMode.APPEND. For streaming, use StreamingOutputMode.

required
params Optional[dict]

Additional parameters to use for specific mode

required

format class-attribute instance-attribute #

format: str = 'delta'

output_mode class-attribute instance-attribute #

output_mode: Optional[
    Union[BatchOutputMode, StreamingOutputMode]
] = Field(
    default=APPEND,
    alias="outputMode",
    description=f"{__doc__}
{__doc__}",
)

params class-attribute instance-attribute #

params: dict = Field(
    default_factory=dict,
    alias="output_mode_params",
    description="Additional parameters to use for specific mode",
)

partition_by class-attribute instance-attribute #

partition_by: Optional[List[str]] = Field(
    default=None,
    alias="partitionBy",
    description="The list of fields to partition the Delta table on",
)

table class-attribute instance-attribute #

table: Union[DeltaTableStep, str] = Field(
    default=..., description="The table to write to"
)

writer property #

writer: Union[DeltaMergeBuilder, DataFrameWriter]

Specify DeltaTableWriter

execute #

execute() -> Output
Source code in src/koheesio/spark/writers/delta/batch.py
def execute(self) -> Writer.Output:
    _writer = self.writer

    if self.table.create_if_not_exists and not self.table.exists:
        _writer = _writer.options(**self.table.default_create_properties)

    if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder":
        _writer.execute()
    else:
        if options := self.params:
            # should we add options only if mode is not merge?
            _writer = _writer.options(**options)
        _writer.saveAsTable(self.table.table_name)

get_output_mode classmethod #

get_output_mode(
    choice: str, options: Set[Type]
) -> Union[BatchOutputMode, StreamingOutputMode]

Retrieve an OutputMode by validating choice against a set of option OutputModes.

Currently supported output modes can be found in:

  • BatchOutputMode
  • StreamingOutputMode
Source code in src/koheesio/spark/writers/delta/batch.py
@classmethod
def get_output_mode(cls, choice: str, options: Set[Type]) -> Union[BatchOutputMode, StreamingOutputMode]:
    """Retrieve an OutputMode by validating `choice` against a set of option OutputModes.

    Currently supported output modes can be found in:

    - BatchOutputMode
    - StreamingOutputMode
    """
    from koheesio.spark.utils.connect import is_remote_session

    if (
        choice.upper() in (BatchOutputMode.MERGEALL, BatchOutputMode.MERGE_ALL, BatchOutputMode.MERGE)
        and is_remote_session()
    ):
        raise RuntimeError(f"Output mode {choice.upper()} is not supported in remote mode")

    for enum_type in options:
        if choice.upper() in [om.value.upper() for om in enum_type]:  # type: ignore
            return getattr(enum_type, choice.upper())
    raise AttributeError(
        f"""
        Invalid outputMode specified '{choice}'. Allowed values are:
        Batch Mode - {BatchOutputMode.__doc__}
        Streaming Mode - {StreamingOutputMode.__doc__}
        """
    )