Skip to content

Delta

Read data from a Delta table and return a DataFrame or DataStream

Classes:

Name Description
DeltaTableReader

Reads data from a Delta table and returns a DataFrame

DeltaTableStreamReader

Reads data from a Delta table and returns a DataStream

koheesio.spark.readers.delta.STREAMING_ONLY_OPTIONS module-attribute #

STREAMING_ONLY_OPTIONS = [
    "ignore_deletes",
    "ignore_changes",
    "starting_version",
    "starting_timestamp",
    "schema_tracking_location",
]

koheesio.spark.readers.delta.STREAMING_SCHEMA_WARNING module-attribute #

STREAMING_SCHEMA_WARNING = "\nImportant!\nAlthough you can start the streaming source from a specified version or timestamp, the schema of the streaming source is always the latest schema of the Delta table. You must ensure there is no incompatible schema change to the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect results when reading the data with an incorrect schema."

koheesio.spark.readers.delta.DeltaTableReader #

Reads data from a Delta table and returns a DataFrame Delta Table can be read in batch or streaming mode It also supports reading change data feed (CDF) in both batch mode and streaming mode

Parameters:

Name Type Description Default
table Union[DeltaTableStep, str]

The table to read

required
filter_cond Optional[Union[Column, str]]

Filter condition to apply to the dataframe. Filters can be provided by using Column or string expressions. For example: f.col('state') == 'Ohio', state = 'Ohio' or (col('col1') > 3) & (col('col2') < 9)

required
columns Optional[ListOfColumns]

Columns to select from the table. One or many columns can be provided as strings. For example: ['col1', 'col2'], ['col1'] or 'col1'

required
streaming Optional[bool]

Whether to read the table as a Stream or not

required
read_change_feed bool

readChangeFeed: Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records 'change events' for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. See: https://docs.databricks.com/delta/delta-change-data-feed.html

required
starting_version str

startingVersion: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output.

required
starting_timestamp str

startingTimestamp: The timestamp to start from. All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. Either provide a timestamp string (e.g. 2019-01-01T00:00:00.000Z) or a date string (e.g. 2019-01-01)

required
ignore_deletes bool

ignoreDeletes: Ignore transactions that delete data at partition boundaries. Note: Only supported for streaming tables. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes

required
ignore_changes bool

ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes. Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.

required

columns class-attribute instance-attribute #

columns: Optional[ListOfColumns] = Field(
    default=None,
    description="Columns to select from the table. One or many columns can be provided as strings. For example: `['col1', 'col2']`, `['col1']` or `'col1'` ",
)

filter_cond class-attribute instance-attribute #

filter_cond: Optional[Union[Column, str]] = Field(
    default=None,
    alias="filterCondition",
    description="Filter condition to apply to the dataframe. Filters can be provided by using Column or string expressions For example: `f.col('state') == 'Ohio'`, `state = 'Ohio'` or  `(col('col1') > 3) & (col('col2') < 9)`",
)

ignore_changes class-attribute instance-attribute #

ignore_changes: bool = Field(
    default=False,
    alias="ignoreChanges",
    description="ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes. Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.",
)

ignore_deletes class-attribute instance-attribute #

ignore_deletes: bool = Field(
    default=False,
    alias="ignoreDeletes",
    description="ignoreDeletes: Ignore transactions that delete data at partition boundaries. Note: Only supported for streaming tables. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes",
)

read_change_feed class-attribute instance-attribute #

read_change_feed: bool = Field(
    default=False,
    alias="readChangeFeed",
    description="Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records 'change events' for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. See: https://docs.databricks.com/delta/delta-change-data-feed.html",
)

reader property #

reader: Union[DataStreamReader, DataFrameReader]

Return the reader for the DeltaTableReader based on the streaming attribute

schema_tracking_location class-attribute instance-attribute #

schema_tracking_location: Optional[str] = Field(
    default=None,
    alias="schemaTrackingLocation",
    description="schemaTrackingLocation: Track the location of source schema. Note: Recommend to enable Delta reader version: 3 and writer version: 7 for this option. For more info see https://docs.delta.io/latest/delta-column-mapping.html"
    + STREAMING_SCHEMA_WARNING,
)

skip_change_commits class-attribute instance-attribute #

skip_change_commits: bool = Field(
    default=False,
    alias="skipChangeCommits",
    description="skipChangeCommits: Skip processing of change commits. Note: Only supported for streaming tables. (not supported in Open Source Delta Implementation). Prefer using skipChangeCommits over ignoreDeletes and ignoreChanges starting DBR12.1 and above. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#skip-change-commits",
)

starting_timestamp class-attribute instance-attribute #

starting_timestamp: Optional[str] = Field(
    default=None,
    alias="startingTimestamp",
    description="startingTimestamp: The timestamp to start from. All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. Either provide a timestamp string (e.g. 2019-01-01T00:00:00.000Z) or a date string (e.g. 2019-01-01)"
    + STREAMING_SCHEMA_WARNING,
)

starting_version class-attribute instance-attribute #

starting_version: Optional[str] = Field(
    default=None,
    alias="startingVersion",
    description="startingVersion: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output."
    + STREAMING_SCHEMA_WARNING,
)

streaming class-attribute instance-attribute #

streaming: Optional[bool] = Field(
    default=False,
    description="Whether to read the table as a Stream or not",
)

table class-attribute instance-attribute #

table: Union[DeltaTableStep, str] = Field(
    default=..., description="The table to read"
)

temp_view_name property #

temp_view_name

Get the temporary view name for the dataframe for SQL queries

view property #

view

Create a temporary view of the dataframe for SQL queries

execute #

execute()
Source code in src/koheesio/spark/readers/delta.py
def execute(self):
    df = self.reader.table(self.table.table_name)
    if self.filter_cond is not None:
        df = df.filter(f.expr(self.filter_cond) if isinstance(self.filter_cond, str) else self.filter_cond)
    if self.columns is not None:
        df = df.select(*self.columns)
    self.output.df = df

get_options #

get_options() -> Dict[str, Any]

Get the options for the DeltaTableReader based on the streaming attribute

Source code in src/koheesio/spark/readers/delta.py
def get_options(self) -> Dict[str, Any]:
    """Get the options for the DeltaTableReader based on the `streaming` attribute"""
    options = {
        # Enable Change Data Feed (CDF) feature
        "readChangeFeed": self.read_change_feed,
        # Initial position, one of:
        "startingVersion": self.starting_version,
        "startingTimestamp": self.starting_timestamp,
    }

    # Streaming only options
    if self.streaming:
        options = {
            **options,
            # Ignore updates and deletes, one of:
            "ignoreDeletes": self.ignore_deletes,
            "ignoreChanges": self.ignore_changes,
            "skipChangeCommits": self.skip_change_commits,
            "schemaTrackingLocation": self.schema_tracking_location,
        }
    # Batch only options
    else:
        pass  # there are none... for now :)

    def normalize(v: Union[str, bool]):
        """normalize values"""
        # True becomes "true", False becomes "false"
        v = str(v).lower() if isinstance(v, bool) else v
        return v

    # Any options with `value == None` are filtered out
    return {k: normalize(v) for k, v in options.items() if v is not None}

set_temp_view_name #

set_temp_view_name()

Set a temporary view name for the dataframe for SQL queries

Source code in src/koheesio/spark/readers/delta.py
@model_validator(mode="after")
def set_temp_view_name(self):
    """Set a temporary view name for the dataframe for SQL queries"""
    table_name = self.table.table
    vw_name = get_random_string(prefix=f"tmp_{table_name}")
    self.__temp_view_name__ = vw_name
    return self

koheesio.spark.readers.delta.DeltaTableStreamReader #

Reads data from a Delta table and returns a DataStream

streaming class-attribute instance-attribute #

streaming: bool = True