Delta
Module for creating and managing Delta tables.
koheesio.spark.delta.log
module-attribute
#
log = get_logger(name=__name__, inherit_from_koheesio=True)
koheesio.spark.delta.DeltaTableStep #
Class for creating and managing Delta tables.
DeltaTable aims to provide a simple interface to create and manage Delta tables. It is a wrapper around the Spark SQL API for Delta tables.
Example
from koheesio.steps import DeltaTableStep
DeltaTableStep(
table="my_table",
database="my_database",
catalog="my_catalog",
create_if_not_exists=True,
default_create_properties={
"delta.randomizeFilePrefixes": "true",
"delta.checkpoint.writeStatsAsStruct": "true",
"delta.minReaderVersion": "2",
"delta.minWriterVersion": "5",
},
)
Methods:
Name | Description |
---|---|
get_persisted_properties |
Get persisted properties of table. |
add_property |
Alter table and set table property. |
add_properties |
Alter table and add properties. |
execute |
Nothing to execute on a Table. |
max_version_ts_of_last_execution |
Max version timestamp of last execution. If no timestamp is found, returns 1900-01-01 00:00:00.
Note: will raise an error if column |
Properties
- name -> str
Deprecated. Use
.table_name
instead. - table_name -> str Table name.
- dataframe -> DataFrame Returns a DataFrame to be able to interact with this table.
- columns -> Optional[List[str]] Returns all column names as a list.
- has_change_type -> bool
Checks if a column named
_change_type
is present in the table. - exists -> bool Check if table exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table
|
str
|
Table name. |
required |
database
|
str
|
Database or Schema name. |
None
|
catalog
|
str
|
Catalog name. |
None
|
create_if_not_exists
|
bool
|
Force table creation if it doesn't exist. Note: Default properties will be applied to the table during CREATION. |
False
|
default_create_properties
|
Dict[str, str]
|
Default table properties to be applied during CREATION if |
{"delta.randomizeFilePrefixes": "true", "delta.checkpoint.writeStatsAsStruct": "true", "delta.minReaderVersion": "2", "delta.minWriterVersion": "5"}
|
catalog
class-attribute
instance-attribute
#
catalog: Optional[str] = Field(
default=None,
description="Catalog name. Note: Can be ignored if using a SparkCatalog that does not support catalog notation (e.g. Hive)",
)
columns
property
#
create_if_not_exists
class-attribute
instance-attribute
#
create_if_not_exists: bool = Field(
default=False,
alias="force_creation",
description="Force table creation if it doesn't exist.Note: Default properties will be applied to the table during CREATION.",
)
database
class-attribute
instance-attribute
#
dataframe
property
#
Returns a DataFrame to be able to interact with this table
default_create_properties
class-attribute
instance-attribute
#
default_create_properties: Dict[
str, Union[str, bool, int]
] = Field(
default={
"delta.randomizeFilePrefixes": "true",
"delta.checkpoint.writeStatsAsStruct": "true",
"delta.minReaderVersion": "2",
"delta.minWriterVersion": "5",
},
description="Default table properties to be applied during CREATION if `create_if_not_exists` True",
)
exists
property
#
exists: bool
Check if table exists.
Depending on the value of the boolean flag create_if_not_exists
a different logging level is provided.
has_change_type
property
#
has_change_type: bool
Checks if a column named _change_type
is present in the table
is_cdf_active
property
#
is_cdf_active: bool
Check if CDF property is set and activated
Returns:
Type | Description |
---|---|
bool
|
delta.enableChangeDataFeed property is set to 'true' |
table_name
property
#
table_name: str
Fully qualified table name in the form of catalog.database.table
add_properties #
Alter table and add properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
properties
|
Dict[str, Union[str, int, bool]]
|
Properties to be added to table. |
required |
override
|
bool
|
Enable override of existing value for property in table. |
False
|
Source code in src/koheesio/spark/delta.py
add_property #
Alter table and set table property.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
Property key(name). |
required |
value
|
Union[str, int, bool]
|
Property value. |
required |
override
|
bool
|
Enable override of existing value for property in table. |
False
|
Source code in src/koheesio/spark/delta.py
describe_history #
Get the latest limit
rows from the Delta Log.
The information is in reverse chronological order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
limit
|
Optional[int]
|
Number of rows to return. |
None
|
Returns:
Type | Description |
---|---|
Optional[DataFrame]
|
Delta Table's history as a DataFrame or None if the table does not exist. |
Examples:
Would return the full history from a Delta Log. Would return the last 10 operations from the Delta Log.Source code in src/koheesio/spark/delta.py
execute #
get_column_type #
Get the type of a specific column in the table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column
|
str
|
Column name. |
required |
Returns:
Type | Description |
---|---|
Optional[DataType]
|
Column type. |
Source code in src/koheesio/spark/delta.py
get_persisted_properties #
Get persisted properties of table.
Returns:
Type | Description |
---|---|
Dict[str, str]
|
Persisted properties as a dictionary. |
Source code in src/koheesio/spark/delta.py
koheesio.spark.delta.StaleDataCheckStep #
Determines if the data inside the Delta table is stale based on the elapsed time since the last modification and, optionally, based on the current week day.
The staleness interval is specified as a timedelta
object.
If refresh_day_num
is provided, it adds an extra condition to mark the data as stale if the current day matches with the specified weekday.
The date of the last modification of the table is taken from the Delta Log.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table
|
Union[DeltaTableStep, str]
|
The table to check for stale data. |
required |
interval
|
timedelta
|
The interval to consider data stale. Users can pass a |
required |
refresh_day_num
|
int
|
The weekday number (0=Monday, 6=Sunday) on which data should be refreshed if it has not already. Enforces a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds. |
required |
Examples:
Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates in the history are shown alongside the examples.
Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold:
is_stale = (
StaleDataCheckStep(table=table, interval=timedelta(days=3))
.execute()
.is_data_stale
)
print(
is_stale
) # True, as the last modification was 3 days and 1 hour ago which is more than 3 days.
Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold:
is_stale = (
StaleDataCheckStep(
table=table, interval=timedelta(days=3, hours=1)
)
.execute()
.is_data_stale
)
print(
is_stale
) # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold.
Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold:
is_stale = (
StaleDataCheckStep(
table=table, interval=timedelta(days=3, hours=2)
)
.execute()
.is_data_stale
)
print(
is_stale
) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours.
Example 4: Same as example 3 but with the interval defined as an ISO-8601 string:
is_stale = (
StaleDataCheckStep(table=table, interval="P3DT2H")
.execute()
.is_data_stale
)
print(
is_stale
) # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours.
Example 5: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday):
is_stale = (
StaleDataCheckStep(
table=table, interval=timedelta(days=5), refresh_day_num=5
)
.execute()
.is_data_stale
)
print(
is_stale
) # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day.
Returns:
Type | Description |
---|---|
bool
|
True if data is considered stale by exceeding the defined time limits or if the current
day equals to |
Raises:
Type | Description |
---|---|
ValueError
|
If the total period exceeds 7 days when |
ValidationError
|
If |
interval
class-attribute
instance-attribute
#
interval: timedelta = Field(
..., description="The interval to consider data stale."
)
refresh_day_num
class-attribute
instance-attribute
#
refresh_day_num: Optional[int] = Field(
default=None,
description="The weekday number on which data should be refreshed.",
ge=0,
le=6,
)
table
class-attribute
instance-attribute
#
table: Union[DeltaTableStep, str] = Field(
..., description="The table to check for stale data."
)
Output #
execute #
execute() -> Output