Batch
This module defines the DeltaTableWriter class, which is used to write both batch and streaming dataframes to Delta tables.
DeltaTableWriter supports two output modes: MERGEALL
and MERGE
.
- The
MERGEALL
mode merges all incoming data with existing data in the table based on certain conditions. - The
MERGE
mode allows for more custom merging behavior using the DeltaMergeBuilder class from thedelta.tables
library.
The output_mode_params
dictionary is used to specify conditions for merging, updating, and inserting data.
The target_alias
and source_alias
keys are used to specify the aliases for the target and source dataframes in the
merge conditions.
Classes:
Name | Description |
---|---|
DeltaTableWriter |
A class for writing data to Delta tables. |
DeltaTableStreamWriter |
A class for writing streaming data to Delta tables. |
Example
koheesio.spark.writers.delta.batch.DeltaTableWriter #
Delta table Writer for both batch and streaming dataframes.
Example
Example for MERGEALL
#
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGEALL,
output_mode_params={
"merge_cond": "target.id=source.id",
"update_cond": "target.col1_val>=source.col1_val",
"insert_cond": "source.col_bk IS NOT NULL",
"target_alias": "target", # <------ DEFAULT, can be changed by providing custom value
"source_alias": "source", # <------ DEFAULT, can be changed by providing custom value
},
)
Example for MERGE
#
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGE,
output_mode_params={
'merge_builder': (
DeltaTable
.forName(sparkSession=spark, tableOrViewName=<target_table_name>)
.alias(target_alias)
.merge(source=df, condition=merge_cond)
.whenMatchedUpdateAll(condition=update_cond)
.whenNotMatchedInsertAll(condition=insert_cond)
)
}
)
Example for MERGE
#
in case the table isn't created yet, first run will execute an APPEND operation
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGE,
output_mode_params={
"merge_builder": [
{
"clause": "whenMatchedUpdate",
"set": {"value": "source.value"},
"condition": "<update_condition>",
},
{
"clause": "whenNotMatchedInsert",
"values": {"id": "source.id", "value": "source.value"},
"condition": "<insert_condition>",
},
],
"merge_cond": "<merge_condition>",
},
)
Example for APPEND#
dataframe writer options can be passed as keyword arguments
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Union[DeltaTableStep, str]
|
The table to write to |
required |
output_mode |
Optional[Union[str, BatchOutputMode, StreamingOutputMode]]
|
The output mode to use. Default is BatchOutputMode.APPEND. For streaming, use StreamingOutputMode. |
required |
params |
Optional[dict]
|
Additional parameters to use for specific mode |
required |
output_mode
class-attribute
instance-attribute
#
output_mode: Optional[
Union[BatchOutputMode, StreamingOutputMode]
] = Field(
default=APPEND,
alias="outputMode",
description=f"{__doc__}
{__doc__}",
)
params
class-attribute
instance-attribute
#
params: Optional[dict] = Field(
default_factory=dict,
alias="output_mode_params",
description="Additional parameters to use for specific mode",
)
partition_by
class-attribute
instance-attribute
#
partition_by: Optional[List[str]] = Field(
default=None,
alias="partitionBy",
description="The list of fields to partition the Delta table on",
)
table
class-attribute
instance-attribute
#
table: Union[DeltaTableStep, str] = Field(
default=..., description="The table to write to"
)
execute #
Source code in src/koheesio/spark/writers/delta/batch.py
get_output_mode
classmethod
#
get_output_mode(
choice: str, options: Set[Type]
) -> Union[BatchOutputMode, StreamingOutputMode]
Retrieve an OutputMode by validating choice
against a set of option OutputModes.
Currently supported output modes can be found in:
- BatchOutputMode
- StreamingOutputMode