Skip to content

Delta

Module for creating and managing Delta tables.

koheesio.spark.delta.log module-attribute #

log = get_logger(name=__name__, inherit_from_koheesio=True)

koheesio.spark.delta.DeltaTableStep #

Class for creating and managing Delta tables.

DeltaTable aims to provide a simple interface to create and manage Delta tables. It is a wrapper around the Spark SQL API for Delta tables.

Example
from koheesio.steps import DeltaTableStep

DeltaTableStep(
    table="my_table",
    database="my_database",
    catalog="my_catalog",
    create_if_not_exists=True,
    default_create_properties={
        "delta.randomizeFilePrefixes": "true",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.minReaderVersion": "2",
        "delta.minWriterVersion": "5",
    },
)

Methods:

Name Description
get_persisted_properties

Get persisted properties of table.

add_property

Alter table and set table property.

add_properties

Alter table and add properties.

execute

Nothing to execute on a Table.

max_version_ts_of_last_execution

Max version timestamp of last execution. If no timestamp is found, returns 1900-01-01 00:00:00. Note: will raise an error if column VERSION_TIMESTAMP does not exist.

Properties
  • name -> str Deprecated. Use .table_name instead.
  • table_name -> str Table name.
  • dataframe -> DataFrame Returns a DataFrame to be able to interact with this table.
  • columns -> Optional[List[str]] Returns all column names as a list.
  • has_change_type -> bool Checks if a column named _change_type is present in the table.
  • exists -> bool Check if table exists.

Parameters:

Name Type Description Default
table str

Table name.

required
database str

Database or Schema name.

None
catalog str

Catalog name.

None
create_if_not_exists bool

Force table creation if it doesn't exist. Note: Default properties will be applied to the table during CREATION.

False
default_create_properties Dict[str, str]

Default table properties to be applied during CREATION if force_creation True.

{"delta.randomizeFilePrefixes": "true", "delta.checkpoint.writeStatsAsStruct": "true", "delta.minReaderVersion": "2", "delta.minWriterVersion": "5"}

catalog class-attribute instance-attribute #

catalog: Optional[str] = Field(
    default=None,
    description="Catalog name. Note: Can be ignored if using a SparkCatalog that does not support catalog notation (e.g. Hive)",
)

columns property #

columns: Optional[List[str]]

Returns all column names as a list.

Example

DeltaTableStep(...).columns
Would for example return ['age', 'name'] if the table has columns age and name.

create_if_not_exists class-attribute instance-attribute #

create_if_not_exists: bool = Field(
    default=False,
    alias="force_creation",
    description="Force table creation if it doesn't exist.Note: Default properties will be applied to the table during CREATION.",
)

database class-attribute instance-attribute #

database: Optional[str] = Field(
    default=None, description="Database or Schema name."
)

dataframe property #

dataframe: DataFrame

Returns a DataFrame to be able to interact with this table

default_create_properties class-attribute instance-attribute #

default_create_properties: Dict[
    str, Union[str, bool, int]
] = Field(
    default={
        "delta.randomizeFilePrefixes": "true",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.minReaderVersion": "2",
        "delta.minWriterVersion": "5",
    },
    description="Default table properties to be applied during CREATION if `create_if_not_exists` True",
)

exists property #

exists: bool

Check if table exists. Depending on the value of the boolean flag create_if_not_exists a different logging level is provided.

has_change_type property #

has_change_type: bool

Checks if a column named _change_type is present in the table

is_cdf_active property #

is_cdf_active: bool

Check if CDF property is set and activated

Returns:

Type Description
bool

delta.enableChangeDataFeed property is set to 'true'

table instance-attribute #

table: str

table_name property #

table_name: str

Fully qualified table name in the form of catalog.database.table

add_properties #

add_properties(
    properties: Dict[str, Union[str, bool, int]],
    override: bool = False,
) -> None

Alter table and add properties.

Parameters:

Name Type Description Default
properties Dict[str, Union[str, int, bool]]

Properties to be added to table.

required
override bool

Enable override of existing value for property in table.

False
Source code in src/koheesio/spark/delta.py
def add_properties(self, properties: Dict[str, Union[str, bool, int]], override: bool = False) -> None:
    """Alter table and add properties.

    Parameters
    ----------
    properties : Dict[str, Union[str, int, bool]]
        Properties to be added to table.
    override : bool, optional, default=False
        Enable override of existing value for property in table.

    """
    for k, v in properties.items():
        v_str = str(v) if not isinstance(v, bool) else str(v).lower()
        self.add_property(key=k, value=v_str, override=override)

add_property #

add_property(
    key: str,
    value: Union[str, int, bool],
    override: bool = False,
) -> None

Alter table and set table property.

Parameters:

Name Type Description Default
key str

Property key(name).

required
value Union[str, int, bool]

Property value.

required
override bool

Enable override of existing value for property in table.

False
Source code in src/koheesio/spark/delta.py
def add_property(self, key: str, value: Union[str, int, bool], override: bool = False) -> None:
    """Alter table and set table property.

    Parameters
    ----------
    key: str
        Property key(name).
    value: Union[str, int, bool]
        Property value.
    override: bool
        Enable override of existing value for property in table.

    """
    persisted_properties = self.get_persisted_properties()
    v_str = str(value) if not isinstance(value, bool) else str(value).lower()

    def _alter_table() -> None:
        property_pair = f"'{key}'='{v_str}'"

        try:
            # noinspection SqlNoDataSourceInspection
            self.spark.sql(f"ALTER TABLE {self.table_name} SET TBLPROPERTIES ({property_pair})")
            self.log.debug(f"Table `{self.table_name}` has been altered. Property `{property_pair}` added.")
        except Py4JJavaError as e:
            msg = f"Property `{key}` can not be applied to table `{self.table_name}`. Exception: {e}"
            self.log.warning(msg)
            warnings.warn(msg)

    if self.exists:
        if key in persisted_properties and persisted_properties[key] != v_str:
            if override:
                self.log.debug(
                    f"Property `{key}` presents in `{self.table_name}` and has value `{persisted_properties[key]}`."
                    f"Override is enabled. The value will be changed to `{v_str}`."
                )
                _alter_table()
            else:
                self.log.debug(
                    f"Skipping adding property `{key}`, because it is already set "
                    f"for table `{self.table_name}` to `{v_str}`. To override it, provide override=True"
                )
        else:
            _alter_table()
    else:
        self.default_create_properties[key] = v_str

describe_history #

describe_history(
    limit: Optional[int] = None,
) -> Optional[DataFrame]

Get the latest limit rows from the Delta Log. The information is in reverse chronological order.

Parameters:

Name Type Description Default
limit Optional[int]

Number of rows to return.

None

Returns:

Type Description
Optional[DataFrame]

Delta Table's history as a DataFrame or None if the table does not exist.

Examples:

DeltaTableStep(...).describe_history()
Would return the full history from a Delta Log.

DeltaTableStep(...).describe_history(limit=10)
Would return the last 10 operations from the Delta Log.

Source code in src/koheesio/spark/delta.py
def describe_history(self, limit: Optional[int] = None) -> Optional[DataFrame]:
    """
    Get the latest `limit` rows from the Delta Log.
    The information is in reverse chronological order.

    Parameters
    ----------
    limit : Optional[int]
        Number of rows to return.

    Returns
    -------
    Optional[DataFrame]
        Delta Table's history as a DataFrame or None if the table does not exist.

    Examples
    -------
    ```python
    DeltaTableStep(...).describe_history()
    ```
    Would return the full history from a Delta Log.

    ```python
    DeltaTableStep(...).describe_history(limit=10)
    ```
    Would return the last 10 operations from the Delta Log.
    """
    if self.exists:
        history_df = self.spark.sql(f"DESCRIBE HISTORY {self.table_name}")
        history_df = history_df.orderBy("version", ascending=False)
        if limit:
            history_df = history_df.limit(limit)
        return history_df
    else:
        self.log.warning(f"Table `{self.table_name}` does not exist.")

execute #

execute() -> None

Nothing to execute on a Table

Source code in src/koheesio/spark/delta.py
def execute(self) -> None:
    """Nothing to execute on a Table"""

get_column_type #

get_column_type(column: str) -> Optional[DataType]

Get the type of a specific column in the table.

Parameters:

Name Type Description Default
column str

Column name.

required

Returns:

Type Description
Optional[DataType]

Column type.

Source code in src/koheesio/spark/delta.py
def get_column_type(self, column: str) -> Optional[DataType]:
    """Get the type of a specific column in the table.

    Parameters
    ----------
    column : str
        Column name.

    Returns
    -------
    Optional[DataType]
        Column type.
    """
    return self.dataframe.schema[column].dataType if self.columns and column in self.columns else None

get_persisted_properties #

get_persisted_properties() -> Dict[str, str]

Get persisted properties of table.

Returns:

Type Description
Dict[str, str]

Persisted properties as a dictionary.

Source code in src/koheesio/spark/delta.py
def get_persisted_properties(self) -> Dict[str, str]:
    """Get persisted properties of table.

    Returns
    -------
    Dict[str, str]
        Persisted properties as a dictionary.
    """
    persisted_properties = {}
    raw_options = self.spark.sql(f"SHOW TBLPROPERTIES {self.table_name}").collect()

    for ro in raw_options:
        key, value = ro.asDict().values()
        persisted_properties[key] = value

    return persisted_properties

koheesio.spark.delta.StaleDataCheckStep #

Determines if the data inside the Delta table is stale based on the elapsed time since the last modification and, optionally, based on the current week day.

The staleness interval is specified as a timedelta object. If refresh_day_num is provided, it adds an extra condition to mark the data as stale if the current day matches with the specified weekday.

The date of the last modification of the table is taken from the Delta Log.

Parameters:

Name Type Description Default
table Union[DeltaTableStep, str]

The table to check for stale data.

required
interval timedelta

The interval to consider data stale. Users can pass a timedelta object or an ISO-8601 compliant string representing the interval. For example P1W3DT2H30M is equivalent to timedelta(weeks=1, days=3, hours=2, minutes=30).

required
refresh_day_num int

The weekday number (0=Monday, 6=Sunday) on which data should be refreshed if it has not already. Enforces a maximum period limit of 6 days, 23 hours, 59 minutes and 59 seconds.

required

Examples:

Assume now is January 31st, 2025 (Friday) 12:00:00 and the last modification dates in the history are shown alongside the examples.

Example 1: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day threshold:

is_stale = (
    StaleDataCheckStep(table=table, interval=timedelta(days=3))
    .execute()
    .is_data_stale
)
print(
    is_stale
)  # True, as the last modification was 3 days and 1 hour ago which is more than 3 days.

Example 2: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 1-hour threshold:

is_stale = (
    StaleDataCheckStep(
        table=table, interval=timedelta(days=3, hours=1)
    )
    .execute()
    .is_data_stale
)
print(
    is_stale
)  # True, as the last modification was 3 days and 1 hour ago which is the same as the threshold.

Example 3: Last modified on January 28th, 2025, 11:00:00 checking with a 3-day and 2-hour threshold:

is_stale = (
    StaleDataCheckStep(
        table=table, interval=timedelta(days=3, hours=2)
    )
    .execute()
    .is_data_stale
)
print(
    is_stale
)  # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours.

Example 4: Same as example 3 but with the interval defined as an ISO-8601 string:

is_stale = (
    StaleDataCheckStep(table=table, interval="P3DT2H")
    .execute()
    .is_data_stale
)
print(
    is_stale
)  # False, as the last modification was 3 days and 1 hour ago which is less than 3 days and 2 hours.

Example 5: Last modified on January 28th, 2025, 11:00:00 checking with a 5-day threshold and refresh_day_num = 5 (Friday):

is_stale = (
    StaleDataCheckStep(
        table=table, interval=timedelta(days=5), refresh_day_num=5
    )
    .execute()
    .is_data_stale
)
print(
    is_stale
)  # True, 3 days and 1 hour is less than 5 days but refresh_day_num is the same as the current day.

Returns:

Type Description
bool

True if data is considered stale by exceeding the defined time limits or if the current day equals to refresh_day_num. Returns False if conditions are not met.

Raises:

Type Description
ValueError

If the total period exceeds 7 days when refresh_day_num is set.

ValidationError

If refresh_day_num is not between 0 and 6.

interval class-attribute instance-attribute #

interval: timedelta = Field(
    ..., description="The interval to consider data stale."
)

refresh_day_num class-attribute instance-attribute #

refresh_day_num: Optional[int] = Field(
    default=None,
    description="The weekday number on which data should be refreshed.",
    ge=0,
    le=6,
)

table class-attribute instance-attribute #

table: Union[DeltaTableStep, str] = Field(
    ..., description="The table to check for stale data."
)

Output #

Output class for StaleDataCheckStep.

is_data_stale class-attribute instance-attribute #

is_data_stale: bool = Field(
    ...,
    description="Boolean flag indicating whether data in the table is stale or not",
)

execute #

execute() -> Output
Source code in src/koheesio/spark/delta.py
def execute(self) -> Output:
    # Get the history of the Delta table
    history_df = self.table.describe_history()

    if not history_df:
        log.debug(f"No history found for `{self.table.table_name}`.")
        self.output.is_data_stale = True  # Consider data stale if the table does not exist
        return self.output

    modification_operations = [
        "WRITE",
        "MERGE",
        "DELETE",
        "UPDATE",
        "REPLACE TABLE AS SELECT",
        "CREATE TABLE AS SELECT",
        "TRUNCATE",
        "RESTORE",
    ]

    # Filter the history to data modification operations only
    history_df = history_df.filter(history_df["operation"].isin(modification_operations))

    # Get the last modification operation's timestamp
    last_modification = history_df.select("timestamp").first()

    if not last_modification:
        log.debug(f"No modification operation found in the history for `{self.table.table_name}`.")
        self.output.is_data_stale = True
        return self.output

    current_time = datetime.now()
    last_modification_timestamp = last_modification["timestamp"]

    cut_off_date = current_time - self.interval

    log.debug(f"Last modification timestamp: {last_modification_timestamp}, cut-off date: {cut_off_date}")

    is_stale_by_time = last_modification_timestamp <= cut_off_date

    if self.refresh_day_num is not None:
        current_day_of_week = current_time.weekday()
        log.debug(f"Current day of the week: {current_day_of_week}, refresh day: {self.refresh_day_num}")

        is_appropriate_day_for_refresh = current_day_of_week == self.refresh_day_num
        self.output.is_data_stale = is_stale_by_time or is_appropriate_day_for_refresh
        return self.output

    self.output.is_data_stale = is_stale_by_time