Skip to content

Delta

Module for creating and managing Delta tables.

koheesio.spark.delta.DeltaTableStep #

Class for creating and managing Delta tables.

DeltaTable aims to provide a simple interface to create and manage Delta tables. It is a wrapper around the Spark SQL API for Delta tables.

Example
from koheesio.steps import DeltaTableStep

DeltaTableStep(
    table="my_table",
    database="my_database",
    catalog="my_catalog",
    create_if_not_exists=True,
    default_create_properties={
        "delta.randomizeFilePrefixes": "true",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.minReaderVersion": "2",
        "delta.minWriterVersion": "5",
    },
)

Methods:

Name Description
get_persisted_properties

Get persisted properties of table.

add_property

Alter table and set table property.

add_properties

Alter table and add properties.

execute

Nothing to execute on a Table.

max_version_ts_of_last_execution

Max version timestamp of last execution. If no timestamp is found, returns 1900-01-01 00:00:00. Note: will raise an error if column VERSION_TIMESTAMP does not exist.

Properties
  • name -> str Deprecated. Use .table_name instead.
  • table_name -> str Table name.
  • dataframe -> DataFrame Returns a DataFrame to be able to interact with this table.
  • columns -> Optional[List[str]] Returns all column names as a list.
  • has_change_type -> bool Checks if a column named _change_type is present in the table.
  • exists -> bool Check if table exists.

Parameters:

Name Type Description Default
table str

Table name.

required
database str

Database or Schema name.

None
catalog str

Catalog name.

None
create_if_not_exists bool

Force table creation if it doesn't exist. Note: Default properties will be applied to the table during CREATION.

False
default_create_properties Dict[str, str]

Default table properties to be applied during CREATION if force_creation True.

{"delta.randomizeFilePrefixes": "true", "delta.checkpoint.writeStatsAsStruct": "true", "delta.minReaderVersion": "2", "delta.minWriterVersion": "5"}

catalog class-attribute instance-attribute #

catalog: Optional[str] = Field(
    default=None,
    description="Catalog name. Note: Can be ignored if using a SparkCatalog that does not support catalog notation (e.g. Hive)",
)

columns property #

columns: Optional[List[str]]

Returns all column names as a list.

Example

DeltaTableStep(...).columns
Would for example return ['age', 'name'] if the table has columns age and name.

create_if_not_exists class-attribute instance-attribute #

create_if_not_exists: bool = Field(
    default=False,
    alias="force_creation",
    description="Force table creation if it doesn't exist.Note: Default properties will be applied to the table during CREATION.",
)

database class-attribute instance-attribute #

database: Optional[str] = Field(
    default=None, description="Database or Schema name."
)

dataframe property #

dataframe: DataFrame

Returns a DataFrame to be able to interact with this table

default_create_properties class-attribute instance-attribute #

default_create_properties: Dict[
    str, Union[str, bool, int]
] = Field(
    default={
        "delta.randomizeFilePrefixes": "true",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.minReaderVersion": "2",
        "delta.minWriterVersion": "5",
    },
    description="Default table properties to be applied during CREATION if `create_if_not_exists` True",
)

exists property #

exists: bool

Check if table exists

has_change_type property #

has_change_type: bool

Checks if a column named _change_type is present in the table

is_cdf_active property #

is_cdf_active: bool

Check if CDF property is set and activated

Returns:

Type Description
bool

delta.enableChangeDataFeed property is set to 'true'

table instance-attribute #

table: str

table_name property #

table_name: str

Fully qualified table name in the form of catalog.database.table

add_properties #

add_properties(
    properties: Dict[str, Union[str, bool, int]],
    override: bool = False,
)

Alter table and add properties.

Parameters:

Name Type Description Default
properties Dict[str, Union[str, int, bool]]

Properties to be added to table.

required
override bool

Enable override of existing value for property in table.

False
Source code in src/koheesio/spark/delta.py
def add_properties(self, properties: Dict[str, Union[str, bool, int]], override: bool = False):
    """Alter table and add properties.

    Parameters
    ----------
    properties : Dict[str, Union[str, int, bool]]
        Properties to be added to table.
    override : bool, optional, default=False
        Enable override of existing value for property in table.

    """
    for k, v in properties.items():
        v_str = str(v) if not isinstance(v, bool) else str(v).lower()
        self.add_property(key=k, value=v_str, override=override)

add_property #

add_property(
    key: str,
    value: Union[str, int, bool],
    override: bool = False,
)

Alter table and set table property.

Parameters:

Name Type Description Default
key str

Property key(name).

required
value Union[str, int, bool]

Property value.

required
override bool

Enable override of existing value for property in table.

False
Source code in src/koheesio/spark/delta.py
def add_property(self, key: str, value: Union[str, int, bool], override: bool = False):
    """Alter table and set table property.

    Parameters
    ----------
    key: str
        Property key(name).
    value: Union[str, int, bool]
        Property value.
    override: bool
        Enable override of existing value for property in table.

    """
    persisted_properties = self.get_persisted_properties()
    v_str = str(value) if not isinstance(value, bool) else str(value).lower()

    def _alter_table() -> None:
        property_pair = f"'{key}'='{v_str}'"

        try:
            # noinspection SqlNoDataSourceInspection
            self.spark.sql(f"ALTER TABLE {self.table_name} SET TBLPROPERTIES ({property_pair})")
            self.log.debug(f"Table `{self.table_name}` has been altered. Property `{property_pair}` added.")
        except Py4JJavaError as e:
            msg = f"Property `{key}` can not be applied to table `{self.table_name}`. Exception: {e}"
            self.log.warning(msg)
            warnings.warn(msg)

    if self.exists:
        if key in persisted_properties and persisted_properties[key] != v_str:
            if override:
                self.log.debug(
                    f"Property `{key}` presents in `{self.table_name}` and has value `{persisted_properties[key]}`."
                    f"Override is enabled.The value will be changed to `{v_str}`."
                )
                _alter_table()
            else:
                self.log.debug(
                    f"Skipping adding property `{key}`, because it is already set "
                    f"for table `{self.table_name}` to `{v_str}`. To override it, provide override=True"
                )
        else:
            _alter_table()
    else:
        self.default_create_properties[key] = v_str

execute #

execute()

Nothing to execute on a Table

Source code in src/koheesio/spark/delta.py
def execute(self):
    """Nothing to execute on a Table"""

get_column_type #

get_column_type(column: str) -> Optional[DataType]

Get the type of a column in the table.

Parameters:

Name Type Description Default
column str

Column name.

required

Returns:

Type Description
Optional[DataType]

Column type.

Source code in src/koheesio/spark/delta.py
def get_column_type(self, column: str) -> Optional[DataType]:
    """Get the type of a column in the table.

    Parameters
    ----------
    column : str
        Column name.

    Returns
    -------
    Optional[DataType]
        Column type.
    """
    return self.dataframe.schema[column].dataType if self.columns and column in self.columns else None

get_persisted_properties #

get_persisted_properties() -> Dict[str, str]

Get persisted properties of table.

Returns:

Type Description
Dict[str, str]

Persisted properties as a dictionary.

Source code in src/koheesio/spark/delta.py
def get_persisted_properties(self) -> Dict[str, str]:
    """Get persisted properties of table.

    Returns
    -------
    Dict[str, str]
        Persisted properties as a dictionary.
    """
    persisted_properties = {}
    raw_options = self.spark.sql(f"SHOW TBLPROPERTIES {self.table_name}").collect()

    for ro in raw_options:
        key, value = ro.asDict().values()
        persisted_properties[key] = value

    return persisted_properties