Delta
Read data from a Delta table and return a DataFrame or DataStream
Classes:
Name | Description |
---|---|
DeltaTableReader |
Reads data from a Delta table and returns a DataFrame |
DeltaTableStreamReader |
Reads data from a Delta table and returns a DataStream |
koheesio.spark.readers.delta.STREAMING_ONLY_OPTIONS
module-attribute
#
STREAMING_ONLY_OPTIONS = [
"ignore_deletes",
"ignore_changes",
"starting_version",
"starting_timestamp",
"schema_tracking_location",
]
koheesio.spark.readers.delta.STREAMING_SCHEMA_WARNING
module-attribute
#
STREAMING_SCHEMA_WARNING = "\nImportant!\nAlthough you can start the streaming source from a specified version or timestamp, the schema of the streaming source is always the latest schema of the Delta table. You must ensure there is no incompatible schema change to the Delta table after the specified version or timestamp. Otherwise, the streaming source may return incorrect results when reading the data with an incorrect schema."
koheesio.spark.readers.delta.DeltaTableReader #
Reads data from a Delta table and returns a DataFrame Delta Table can be read in batch or streaming mode It also supports reading change data feed (CDF) in both batch mode and streaming mode
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Union[DeltaTableStep, str]
|
The table to read |
required |
filter_cond |
Optional[Union[Column, str]]
|
Filter condition to apply to the dataframe. Filters can be provided by using Column or string expressions.
For example: |
required |
columns |
Optional[ListOfColumns]
|
Columns to select from the table. One or many columns can be provided as strings.
For example: |
required |
streaming |
Optional[bool]
|
Whether to read the table as a Stream or not |
required |
read_change_feed |
bool
|
readChangeFeed: Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records 'change events' for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. See: https://docs.databricks.com/delta/delta-change-data-feed.html |
required |
starting_version |
str
|
startingVersion: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output. |
required |
starting_timestamp |
str
|
startingTimestamp: The timestamp to start from. All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. Either provide a timestamp string (e.g. 2019-01-01T00:00:00.000Z) or a date string (e.g. 2019-01-01) |
required |
ignore_deletes |
bool
|
ignoreDeletes: Ignore transactions that delete data at partition boundaries. Note: Only supported for streaming tables. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes |
required |
ignore_changes |
bool
|
ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes. Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table. |
required |
columns
class-attribute
instance-attribute
#
columns: Optional[ListOfColumns] = Field(
default=None,
description="Columns to select from the table. One or many columns can be provided as strings. For example: `['col1', 'col2']`, `['col1']` or `'col1'` ",
)
filter_cond
class-attribute
instance-attribute
#
filter_cond: Optional[Union[Column, str]] = Field(
default=None,
alias="filterCondition",
description="Filter condition to apply to the dataframe. Filters can be provided by using Column or string expressions For example: `f.col('state') == 'Ohio'`, `state = 'Ohio'` or `(col('col1') > 3) & (col('col2') < 9)`",
)
ignore_changes
class-attribute
instance-attribute
#
ignore_changes: bool = Field(
default=False,
alias="ignoreChanges",
description="ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Deletes are not propagated downstream. ignoreChanges subsumes ignoreDeletes. Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.",
)
ignore_deletes
class-attribute
instance-attribute
#
ignore_deletes: bool = Field(
default=False,
alias="ignoreDeletes",
description="ignoreDeletes: Ignore transactions that delete data at partition boundaries. Note: Only supported for streaming tables. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#ignore-updates-and-deletes",
)
read_change_feed
class-attribute
instance-attribute
#
read_change_feed: bool = Field(
default=False,
alias="readChangeFeed",
description="Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records 'change events' for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. See: https://docs.databricks.com/delta/delta-change-data-feed.html",
)
reader
property
#
reader: Union[DataStreamReader, DataFrameReader]
Return the reader for the DeltaTableReader based on the streaming
attribute
schema_tracking_location
class-attribute
instance-attribute
#
schema_tracking_location: Optional[str] = Field(
default=None,
alias="schemaTrackingLocation",
description="schemaTrackingLocation: Track the location of source schema. Note: Recommend to enable Delta reader version: 3 and writer version: 7 for this option. For more info see https://docs.delta.io/latest/delta-column-mapping.html"
+ STREAMING_SCHEMA_WARNING,
)
skip_change_commits
class-attribute
instance-attribute
#
skip_change_commits: bool = Field(
default=False,
alias="skipChangeCommits",
description="skipChangeCommits: Skip processing of change commits. Note: Only supported for streaming tables. (not supported in Open Source Delta Implementation). Prefer using skipChangeCommits over ignoreDeletes and ignoreChanges starting DBR12.1 and above. For more info see https://docs.databricks.com/structured-streaming/delta-lake.html#skip-change-commits",
)
starting_timestamp
class-attribute
instance-attribute
#
starting_timestamp: Optional[str] = Field(
default=None,
alias="startingTimestamp",
description="startingTimestamp: The timestamp to start from. All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. Either provide a timestamp string (e.g. 2019-01-01T00:00:00.000Z) or a date string (e.g. 2019-01-01)"
+ STREAMING_SCHEMA_WARNING,
)
starting_version
class-attribute
instance-attribute
#
starting_version: Optional[str] = Field(
default=None,
alias="startingVersion",
description="startingVersion: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output."
+ STREAMING_SCHEMA_WARNING,
)
streaming
class-attribute
instance-attribute
#
streaming: Optional[bool] = Field(
default=False,
description="Whether to read the table as a Stream or not",
)
table
class-attribute
instance-attribute
#
table: Union[DeltaTableStep, str] = Field(
default=..., description="The table to read"
)
temp_view_name
property
#
Get the temporary view name for the dataframe for SQL queries
execute #
Source code in src/koheesio/spark/readers/delta.py
get_options #
Get the options for the DeltaTableReader based on the streaming
attribute
Source code in src/koheesio/spark/readers/delta.py
set_temp_view_name #
Set a temporary view name for the dataframe for SQL queries