Delta
This module is the entry point for the koheesio.spark.writers.delta package.
It imports and exposes the DeltaTableWriter and DeltaTableStreamWriter classes for external use.
Classes: DeltaTableWriter: Class to write data in batch mode to a Delta table. DeltaTableStreamWriter: Class to write data in streaming mode to a Delta table.
koheesio.spark.writers.delta.BatchOutputMode #
For Batch:
- append: Append the contents of the DataFrame to the output table, default option in Koheesio.
- overwrite: overwrite the existing data.
- ignore: ignore the operation (i.e. no-op).
- error or errorifexists: throw an exception at runtime.
- merge: update matching data in the table and insert rows that do not exist.
- merge_all: update matching data in the table and insert rows that do not exist.
koheesio.spark.writers.delta.DeltaTableStreamWriter #
Delta table stream writer
Options #
Options for DeltaTableStreamWriter
allow_population_by_field_name
class-attribute
instance-attribute
#
allow_population_by_field_name: bool = Field(
default=True,
description=" To do convert to Field and pass as .options(**config)",
)
koheesio.spark.writers.delta.DeltaTableWriter #
Delta table Writer for both batch and streaming dataframes.
Example
Example for MERGEALL
#
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGEALL,
output_mode_params={
"merge_cond": "target.id=source.id",
"update_cond": "target.col1_val>=source.col1_val",
"insert_cond": "source.col_bk IS NOT NULL",
"target_alias": "target", # <------ DEFAULT, can be changed by providing custom value
"source_alias": "source", # <------ DEFAULT, can be changed by providing custom value
},
)
Example for MERGE
#
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGE,
output_mode_params={
'merge_builder': (
DeltaTable
.forName(sparkSession=spark, tableOrViewName=<target_table_name>)
.alias(target_alias)
.merge(source=df, condition=merge_cond)
.whenMatchedUpdateAll(condition=update_cond)
.whenNotMatchedInsertAll(condition=insert_cond)
)
}
)
Example for MERGE
#
in case the table isn't created yet, first run will execute an APPEND operation
DeltaTableWriter(
table="test_table",
output_mode=BatchOutputMode.MERGE,
output_mode_params={
"merge_builder": [
{
"clause": "whenMatchedUpdate",
"set": {"value": "source.value"},
"condition": "<update_condition>",
},
{
"clause": "whenNotMatchedInsert",
"values": {"id": "source.id", "value": "source.value"},
"condition": "<insert_condition>",
},
],
"merge_cond": "<merge_condition>",
},
)
Example for APPEND#
dataframe writer options can be passed as keyword arguments
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table
|
Union[DeltaTableStep, str]
|
The table to write to |
required |
output_mode
|
Optional[Union[str, BatchOutputMode, StreamingOutputMode]]
|
The output mode to use. Default is BatchOutputMode.APPEND. For streaming, use StreamingOutputMode. |
required |
params
|
Optional[dict]
|
Additional parameters to use for specific mode |
required |
output_mode
class-attribute
instance-attribute
#
output_mode: Optional[
Union[BatchOutputMode, StreamingOutputMode]
] = Field(
default=APPEND,
alias="outputMode",
description=f"{__doc__}
{__doc__}",
)
params
class-attribute
instance-attribute
#
params: dict = Field(
default_factory=dict,
alias="output_mode_params",
description="Additional parameters to use for specific mode",
)
partition_by
class-attribute
instance-attribute
#
partition_by: Optional[List[str]] = Field(
default=None,
alias="partitionBy",
description="The list of fields to partition the Delta table on",
)
table
class-attribute
instance-attribute
#
table: Union[DeltaTableStep, str] = Field(
default=..., description="The table to write to"
)
execute #
execute() -> Output
Source code in src/koheesio/spark/writers/delta/batch.py
get_output_mode
classmethod
#
get_output_mode(
choice: str, options: Set[Type]
) -> Union[BatchOutputMode, StreamingOutputMode]
Retrieve an OutputMode by validating choice
against a set of option OutputModes.
Currently supported output modes can be found in:
- BatchOutputMode
- StreamingOutputMode
Source code in src/koheesio/spark/writers/delta/batch.py
koheesio.spark.writers.delta.SCD2DeltaTableWriter #
A class used to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.
Attributes:
Name | Type | Description |
---|---|---|
table |
InstanceOf[DeltaTableStep]
|
The table to merge to. |
merge_key |
str
|
The key used for merging data. |
include_columns |
List[str]
|
Columns to be merged. Will be selected from DataFrame. Default is all columns. |
exclude_columns |
List[str]
|
Columns to be excluded from DataFrame. |
scd2_columns |
List[str]
|
List of attributes for SCD2 type (track changes). |
scd2_timestamp_col |
Optional[Column]
|
Timestamp column for SCD2 type (track changes). Default to current_timestamp. |
scd1_columns |
List[str]
|
List of attributes for SCD1 type (just update). |
meta_scd2_struct_col_name |
str
|
SCD2 struct name. |
meta_scd2_effective_time_col_name |
str
|
Effective col name. |
meta_scd2_is_current_col_name |
str
|
Current col name. |
meta_scd2_end_time_col_name |
str
|
End time col name. |
target_auto_generated_columns |
List[str]
|
Auto generated columns from target Delta table. Will be used to exclude from merge logic. |
exclude_columns
class-attribute
instance-attribute
#
exclude_columns: List[str] = Field(
default_factory=list,
description="Columns to be excluded from DataFrame",
)
include_columns
class-attribute
instance-attribute
#
include_columns: List[str] = Field(
default_factory=list,
description="Columns to be merged. Will be selected from DataFrame.Default is all columns",
)
meta_scd2_effective_time_col_name
class-attribute
instance-attribute
#
meta_scd2_effective_time_col_name: str = Field(
default="effective_time",
description="Effective col name",
)
meta_scd2_end_time_col_name
class-attribute
instance-attribute
#
meta_scd2_end_time_col_name: str = Field(
default="end_time", description="End time col name"
)
meta_scd2_is_current_col_name
class-attribute
instance-attribute
#
meta_scd2_is_current_col_name: str = Field(
default="is_current", description="Current col name"
)
meta_scd2_struct_col_name
class-attribute
instance-attribute
#
meta_scd2_struct_col_name: str = Field(
default="_scd2", description="SCD2 struct name"
)
scd1_columns
class-attribute
instance-attribute
#
scd1_columns: List[str] = Field(
default_factory=list,
description="List of attributes for scd1 type (just update)",
)
scd2_columns
class-attribute
instance-attribute
#
scd2_columns: List[str] = Field(
default_factory=list,
description="List of attributes for scd2 type (track changes)",
)
scd2_timestamp_col
class-attribute
instance-attribute
#
scd2_timestamp_col: Column = Field(
default=None,
description="Timestamp column for SCD2 type (track changes). Default to current_timestamp",
)
table
class-attribute
instance-attribute
#
table: InstanceOf[DeltaTableStep] = Field(
..., description="The table to merge to"
)
target_auto_generated_columns
class-attribute
instance-attribute
#
target_auto_generated_columns: List[str] = Field(
default_factory=list,
description="Auto generated columns from target Delta table. Will be used to exclude from merge logic",
)
execute #
Execute the SCD Type 2 operation.
This method executes the SCD Type 2 operation on the DataFrame. It validates the existing Delta table, prepares the merge conditions, stages the data, and then performs the merge operation.
Raises:
Type | Description |
---|---|
TypeError
|
If the scd2_timestamp_col is not of date or timestamp type. If the source DataFrame is missing any of the required merge columns. |
Source code in src/koheesio/spark/writers/delta/scd.py
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
|