Scd
This module defines writers to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.
Slowly Changing Dimension (SCD) is a technique used in data warehousing to handle changes to dimension data over time. SCD Type 2 is one of the most common types of SCD, where historical changes are tracked by creating new records for each change.
Koheesio is a powerful data processing framework that provides advanced capabilities for working with Delta tables in Apache Spark. It offers a convenient and efficient way to handle SCD Type 2 operations on Delta tables.
To learn more about Slowly Changing Dimension and SCD Type 2, you can refer to the following resources: - Slowly Changing Dimension (SCD) - Wikipedia
By using Koheesio, you can benefit from its efficient merge logic, support for SCD Type 2 and SCD Type 1 attributes, and seamless integration with Delta tables in Spark.
koheesio.spark.writers.delta.scd.SCD2DeltaTableWriter #
A class used to write Slowly Changing Dimension (SCD) Type 2 data to a Delta table.
Attributes:
Name | Type | Description |
---|---|---|
table |
InstanceOf[DeltaTableStep]
|
The table to merge to. |
merge_key |
str
|
The key used for merging data. |
include_columns |
List[str]
|
Columns to be merged. Will be selected from DataFrame. Default is all columns. |
exclude_columns |
List[str]
|
Columns to be excluded from DataFrame. |
scd2_columns |
List[str]
|
List of attributes for SCD2 type (track changes). |
scd2_timestamp_col |
Optional[Column]
|
Timestamp column for SCD2 type (track changes). Default to current_timestamp. |
scd1_columns |
List[str]
|
List of attributes for SCD1 type (just update). |
meta_scd2_struct_col_name |
str
|
SCD2 struct name. |
meta_scd2_effective_time_col_name |
str
|
Effective col name. |
meta_scd2_is_current_col_name |
str
|
Current col name. |
meta_scd2_end_time_col_name |
str
|
End time col name. |
target_auto_generated_columns |
List[str]
|
Auto generated columns from target Delta table. Will be used to exclude from merge logic. |
exclude_columns
class-attribute
instance-attribute
#
exclude_columns: List[str] = Field(
default_factory=list,
description="Columns to be excluded from DataFrame",
)
include_columns
class-attribute
instance-attribute
#
include_columns: List[str] = Field(
default_factory=list,
description="Columns to be merged. Will be selected from DataFrame.Default is all columns",
)
meta_scd2_effective_time_col_name
class-attribute
instance-attribute
#
meta_scd2_effective_time_col_name: str = Field(
default="effective_time",
description="Effective col name",
)
meta_scd2_end_time_col_name
class-attribute
instance-attribute
#
meta_scd2_end_time_col_name: str = Field(
default="end_time", description="End time col name"
)
meta_scd2_is_current_col_name
class-attribute
instance-attribute
#
meta_scd2_is_current_col_name: str = Field(
default="is_current", description="Current col name"
)
meta_scd2_struct_col_name
class-attribute
instance-attribute
#
meta_scd2_struct_col_name: str = Field(
default="_scd2", description="SCD2 struct name"
)
scd1_columns
class-attribute
instance-attribute
#
scd1_columns: List[str] = Field(
default_factory=list,
description="List of attributes for scd1 type (just update)",
)
scd2_columns
class-attribute
instance-attribute
#
scd2_columns: List[str] = Field(
default_factory=list,
description="List of attributes for scd2 type (track changes)",
)
scd2_timestamp_col
class-attribute
instance-attribute
#
scd2_timestamp_col: Optional[Column] = Field(
default=None,
description="Timestamp column for SCD2 type (track changes). Default to current_timestamp",
)
table
class-attribute
instance-attribute
#
table: InstanceOf[DeltaTableStep] = Field(
..., description="The table to merge to"
)
target_auto_generated_columns
class-attribute
instance-attribute
#
target_auto_generated_columns: List[str] = Field(
default_factory=list,
description="Auto generated columns from target Delta table. Will be used to exclude from merge logic",
)
execute #
Execute the SCD Type 2 operation.
This method executes the SCD Type 2 operation on the DataFrame. It validates the existing Delta table, prepares the merge conditions, stages the data, and then performs the merge operation.
Raises:
Type | Description |
---|---|
TypeError
|
If the scd2_timestamp_col is not of date or timestamp type. If the source DataFrame is missing any of the required merge columns. |
Source code in src/koheesio/spark/writers/delta/scd.py
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 |
|