Skip to content

Snowflake

Snowflake steps and tasks for Koheesio

Every class in this module is a subclass of Step or Task and is used to perform operations on Snowflake.

Notes

Every Step in this module is based on SnowflakeBaseModel. The following parameters are available for every Step.

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
database str

The database to use for the session after connecting. Alias for sfDatabase.

required
sfSchema str

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}
format str

The default snowflake format can be used natively in Databricks, use net.snowflake.spark.snowflake in other environments and make sure to install required JARs.

"snowflake"

koheesio.spark.snowflake.AddColumn #

Add an empty column to a Snowflake table with given name and DataType

Example
AddColumn(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@nike.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    col="MY_COL",
    dataType=StringType(),
).execute()

column class-attribute instance-attribute #

column: str = Field(
    default=..., description="The name of the new column"
)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    description="The name of the Snowflake table",
)

type class-attribute instance-attribute #

type: DataType = Field(
    default=...,
    description="The DataType represented as a Spark DataType",
)

Output #

Output class for AddColumn

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to add the column",
)

execute #

execute()
Source code in src/koheesio/spark/snowflake.py
def execute(self):
    query = f"ALTER TABLE {self.table} ADD COLUMN {self.column} {map_spark_type(self.type)}".upper()
    self.output.query = query
    RunQuery(**self.get_options(), query=query).execute()

koheesio.spark.snowflake.CreateOrReplaceTableFromDataFrame #

Create (or Replace) a Snowflake table which has the same schema as a Spark DataFrame

Can be used as any Transformation. The DataFrame is however left unchanged, and only used for determining the schema of the Snowflake Table that is to be created (or replaced).

Example
CreateOrReplaceTableFromDataFrame(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@nike.com",
    password="super-secret-password",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    df=df,
).execute()

Or, as a Transformation:

CreateOrReplaceTableFromDataFrame(
    ...
    table="MY_TABLE",
).transform(df)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    alias="table_name",
    description="The name of the (new) table",
)

Output #

Output class for CreateOrReplaceTableFromDataFrame

input_schema class-attribute instance-attribute #

input_schema: StructType = Field(
    default=...,
    description="The original schema from the input DataFrame",
)

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to create the table",
)

snowflake_schema class-attribute instance-attribute #

snowflake_schema: str = Field(
    default=...,
    description="Derived Snowflake table schema based on the input DataFrame",
)

execute #

execute()
Source code in src/koheesio/spark/snowflake.py
def execute(self):
    self.output.df = self.df

    input_schema = self.df.schema
    self.output.input_schema = input_schema

    snowflake_schema = ", ".join([f"{c.name} {map_spark_type(c.dataType)}" for c in input_schema])
    self.output.snowflake_schema = snowflake_schema

    table_name = f"{self.database}.{self.sfSchema}.{self.table}"
    query = f"CREATE OR REPLACE TABLE {table_name} ({snowflake_schema})"
    self.output.query = query

    RunQuery(**self.get_options(), query=query).execute()

koheesio.spark.snowflake.DbTableQuery #

Read table from Snowflake using the dbtable option instead of query

Example
DbTableQuery(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="user",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="db.schema.table",
).execute().df

dbtable class-attribute instance-attribute #

dbtable: str = Field(
    default=...,
    alias="table",
    description="The name of the table",
)

koheesio.spark.snowflake.GetTableSchema #

Get the schema from a Snowflake table as a Spark Schema

Notes
  • This Step will execute a SELECT * FROM <table> LIMIT 1 query to get the schema of the table.
  • The schema will be stored in the table_schema attribute of the output.
  • table_schema is used as the attribute name to avoid conflicts with the schema attribute of Pydantic's BaseModel.
Example
schema = (
    GetTableSchema(
        database="MY_DB",
        schema_="MY_SCHEMA",
        warehouse="MY_WH",
        user="gid.account@nike.com",
        password="super-secret-password",
        role="APPLICATION.SNOWFLAKE.ADMIN",
        table="MY_TABLE",
    )
    .execute()
    .table_schema
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The Snowflake table name"
)

Output #

Output class for GetTableSchema

table_schema class-attribute instance-attribute #

table_schema: StructType = Field(
    default=...,
    serialization_alias="schema",
    description="The Spark Schema",
)

execute #

execute() -> Output
Source code in src/koheesio/spark/snowflake.py
def execute(self) -> Output:
    query = f"SELECT * FROM {self.table} LIMIT 1"  # nosec B608: hardcoded_sql_expressions
    df = Query(**self.get_options(), query=query).execute().df
    self.output.table_schema = df.schema

koheesio.spark.snowflake.GrantPrivilegesOnFullyQualifiedObject #

Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name

This class is a subclass of GrantPrivilegesOnObject and is used to grant privileges on a fully qualified object. The advantage of using this class is that it sets the object name to be fully qualified, i.e. database.schema.object_name.

Meaning, you can set the database, schema and object separately and the object name will be set to be fully qualified, i.e. database.schema.object_name.

Example
GrantPrivilegesOnFullyQualifiedObject(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    ...
    object="MY_TABLE",
    type="TABLE",
    ...
)

In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE. If you were to use GrantPrivilegesOnObject instead, you would have to set the object name to be fully qualified yourself.

set_object_name #

set_object_name()

Set the object name to be fully qualified, i.e. database.schema.object_name

Source code in src/koheesio/spark/snowflake.py
@model_validator(mode="after")
def set_object_name(self):
    """Set the object name to be fully qualified, i.e. database.schema.object_name"""
    # database, schema, obj_name
    db = self.database
    schema = self.model_dump()["sfSchema"]  # since "schema" is a reserved name
    obj_name = self.object

    self.object = f"{db}.{schema}.{obj_name}"

    return self

koheesio.spark.snowflake.GrantPrivilegesOnObject #

A wrapper on Snowflake GRANT privileges

With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object

See Also

https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html

Parameters:

Name Type Description Default
warehouse str

The name of the warehouse. Alias for sfWarehouse

required
user str

The username. Alias for sfUser

required
password SecretStr

The password. Alias for sfPassword

required
role str

The role name

required
object str

The name of the object to grant privileges on

required
type str

The type of object to grant privileges on, e.g. TABLE, VIEW

required
privileges Union[conlist(str, min_length=1), str]

The Privilege/Permission or list of Privileges/Permissions to grant on the given object.

required
roles Union[conlist(str, min_length=1), str]

The Role or list of Roles to grant the privileges to

required
Example
GrantPermissionsOnTable(
    object="MY_TABLE",
    type="TABLE",
    warehouse="MY_WH",
    user="gid.account@nike.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    permissions=["SELECT", "INSERT"],
).execute()

In this example, the APPLICATION.SNOWFLAKE.ADMIN role will be granted SELECT and INSERT privileges on the MY_TABLE table using the MY_WH warehouse.

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    description="The name of the object to grant privileges on",
)

privileges class-attribute instance-attribute #

privileges: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="permissions",
    description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)

roles class-attribute instance-attribute #

roles: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="role",
    validation_alias="roles",
    description="The Role or list of Roles to grant the privileges to",
)

type class-attribute instance-attribute #

type: str = Field(
    default=...,
    description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)

Output #

Output class for GrantPrivilegesOnObject

query class-attribute instance-attribute #

query: conlist(str, min_length=1) = Field(
    default=...,
    description="Query that was executed to grant privileges",
    validate_default=False,
)

execute #

execute()
Source code in src/koheesio/spark/snowflake.py
def execute(self):
    self.output.query = []
    roles = self.roles

    for role in roles:
        query = self.get_query(role)
        self.output.query.append(query)
        RunQuery(**self.get_options(), query=query).execute()

get_query #

get_query(role: str)

Build the GRANT query

Parameters:

Name Type Description Default
role str

The role name

required

Returns:

Name Type Description
query str

The Query that performs the grant

Source code in src/koheesio/spark/snowflake.py
def get_query(self, role: str):
    """Build the GRANT query

    Parameters
    ----------
    role: str
        The role name

    Returns
    -------
    query : str
        The Query that performs the grant
    """
    query = f"GRANT {','.join(self.privileges)} ON {self.type} {self.object} TO ROLE {role}".upper()
    return query

set_roles_privileges #

set_roles_privileges(values)

Coerce roles and privileges to be lists if they are not already.

Source code in src/koheesio/spark/snowflake.py
@model_validator(mode="before")
def set_roles_privileges(cls, values):
    """Coerce roles and privileges to be lists if they are not already."""
    roles_value = values.get("roles") or values.get("role")
    privileges_value = values.get("privileges")

    if not (roles_value and privileges_value):
        raise ValueError("You have to specify roles AND privileges when using 'GrantPrivilegesOnObject'.")

    # coerce values to be lists
    values["roles"] = [roles_value] if isinstance(roles_value, str) else roles_value
    values["role"] = values["roles"][0]  # hack to keep the validator happy
    values["privileges"] = [privileges_value] if isinstance(privileges_value, str) else privileges_value

    return values

validate_object_and_object_type #

validate_object_and_object_type()

Validate that the object and type are set.

Source code in src/koheesio/spark/snowflake.py
@model_validator(mode="after")
def validate_object_and_object_type(self):
    """Validate that the object and type are set."""
    object_value = self.object
    if not object_value:
        raise ValueError("You must provide an `object`, this should be the name of the object. ")

    object_type = self.type
    if not object_type:
        raise ValueError(
            "You must provide a `type`, e.g. TABLE, VIEW, DATABASE. "
            "See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html"
        )

    return self

koheesio.spark.snowflake.GrantPrivilegesOnTable #

Grant Snowflake privileges to a set of roles on a table

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="table",
    description="The name of the Table to grant Privileges on. This should be just the name of the table; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'TABLE'

koheesio.spark.snowflake.GrantPrivilegesOnView #

Grant Snowflake privileges to a set of roles on a view

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="view",
    description="The name of the View to grant Privileges on. This should be just the name of the view; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'VIEW'

koheesio.spark.snowflake.Query #

Query data from Snowflake and return the result as a DataFrame

Example
Query(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@nike.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="SELECT * FROM MY_TABLE",
).execute().df

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run"
)

get_options #

get_options()

add query to options

Source code in src/koheesio/spark/snowflake.py
def get_options(self):
    """add query to options"""
    options = super().get_options()
    options["query"] = self.query
    return options

validate_query #

validate_query(query)

Replace escape characters

Source code in src/koheesio/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query):
    """Replace escape characters"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    return query

koheesio.spark.snowflake.RunQuery #

Run a query on Snowflake that does not return a result, e.g. create table statement

This is a wrapper around 'net.snowflake.spark.snowflake.Utils.runQuery' on the JVM

Example
RunQuery(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="account",
    password="***",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="CREATE TABLE test (col1 string)",
).execute()

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run", alias="sql"
)

execute #

execute() -> None
Source code in src/koheesio/spark/snowflake.py
def execute(self) -> None:
    if not self.query:
        self.log.warning("Empty string given as query input, skipping execution")
        return
    # noinspection PyProtectedMember
    self.spark._jvm.net.snowflake.spark.snowflake.Utils.runQuery(self.get_options(), self.query)

get_options #

get_options()
Source code in src/koheesio/spark/snowflake.py
def get_options(self):
    # Executing the RunQuery without `host` option in Databricks throws:
    # An error occurred while calling z:net.snowflake.spark.snowflake.Utils.runQuery.
    # : java.util.NoSuchElementException: key not found: host
    options = super().get_options()
    options["host"] = options["sfURL"]
    return options

validate_query #

validate_query(query)

Replace escape characters

Source code in src/koheesio/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query):
    """Replace escape characters"""
    return query.replace("\\n", "\n").replace("\\t", "\t").strip()

koheesio.spark.snowflake.SnowflakeBaseModel #

BaseModel for setting up Snowflake Driver options.

Notes

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
database str

The database to use for the session after connecting. Alias for sfDatabase.

required
sfSchema str

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}
format str

The default snowflake format can be used natively in Databricks, use net.snowflake.spark.snowflake in other environments and make sure to install required JARs.

"snowflake"

authenticator class-attribute instance-attribute #

authenticator: Optional[str] = Field(
    default=None,
    description="Authenticator for the Snowflake user",
    examples=["okta.com"],
)

database class-attribute instance-attribute #

database: str = Field(
    default=...,
    alias="sfDatabase",
    description="The database to use for the session after connecting",
)

format class-attribute instance-attribute #

format: str = Field(
    default="snowflake",
    description="The default `snowflake` format can be used natively in Databricks, use `net.snowflake.spark.snowflake` in other environments and make sure to install required JARs.",
)

options class-attribute instance-attribute #

options: Optional[Dict[str, Any]] = Field(
    default={
        "sfCompress": "on",
        "continue_on_error": "off",
    },
    description="Extra options to pass to the Snowflake connector",
)

password class-attribute instance-attribute #

password: SecretStr = Field(
    default=...,
    alias="sfPassword",
    description="Password for the Snowflake user",
)

role class-attribute instance-attribute #

role: str = Field(
    default=...,
    alias="sfRole",
    description="The default security role to use for the session after connecting",
)

sfSchema class-attribute instance-attribute #

sfSchema: str = Field(
    default=...,
    alias="schema",
    description="The schema to use for the session after connecting",
)

url class-attribute instance-attribute #

url: str = Field(
    default=...,
    alias="sfURL",
    description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
    examples=["example.snowflakecomputing.com"],
)

user class-attribute instance-attribute #

user: str = Field(
    default=...,
    alias="sfUser",
    description="Login name for the Snowflake user",
)

warehouse class-attribute instance-attribute #

warehouse: str = Field(
    default=...,
    alias="sfWarehouse",
    description="The default virtual warehouse to use for the session after connecting",
)

get_options #

get_options()

Get the sfOptions as a dictionary.

Source code in src/koheesio/spark/snowflake.py
def get_options(self):
    """Get the sfOptions as a dictionary."""
    return {
        key: value
        for key, value in {
            "sfURL": self.url,
            "sfUser": self.user,
            "sfPassword": self.password.get_secret_value(),
            "authenticator": self.authenticator,
            "sfDatabase": self.database,
            "sfSchema": self.sfSchema,
            "sfRole": self.role,
            "sfWarehouse": self.warehouse,
            **self.options,
        }.items()
        if value is not None
    }

koheesio.spark.snowflake.SnowflakeReader #

Wrapper around JdbcReader for Snowflake.

Example
sr = SnowflakeReader(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
)
df = sr.read()
Notes

driver class-attribute instance-attribute #

driver: Optional[str] = None

koheesio.spark.snowflake.SnowflakeStep #

Expands the SnowflakeBaseModel so that it can be used as a Step

koheesio.spark.snowflake.SnowflakeTableStep #

Expands the SnowflakeStep, adding a 'table' parameter

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The name of the table"
)

get_options #

get_options()
Source code in src/koheesio/spark/snowflake.py
def get_options(self):
    options = super().get_options()
    options["table"] = self.table
    return options

koheesio.spark.snowflake.SnowflakeTransformation #

Adds Snowflake parameters to the Transformation class

koheesio.spark.snowflake.SnowflakeWriter #

Class for writing to Snowflake

See Also

insert_type class-attribute instance-attribute #

insert_type: Optional[BatchOutputMode] = Field(
    APPEND,
    alias="mode",
    description="The insertion type, append or overwrite",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="Target table name"
)

execute #

execute()

Write to Snowflake

Source code in src/koheesio/spark/snowflake.py
def execute(self):
    """Write to Snowflake"""
    self.log.debug(f"writing to {self.table} with mode {self.insert_type}")
    self.df.write.format(self.format).options(**self.get_options()).option("dbtable", self.table).mode(
        self.insert_type
    ).save()

koheesio.spark.snowflake.SyncTableAndDataFrameSchema #

Sync the schema's of a Snowflake table and a DataFrame. This will add NULL columns for the columns that are not in both and perform type casts where needed.

The Snowflake table will take priority in case of type conflicts.

df class-attribute instance-attribute #

df: DataFrame = Field(
    default=..., description="The Spark DataFrame"
)

dry_run class-attribute instance-attribute #

dry_run: Optional[bool] = Field(
    default=False,
    description="Only show schema differences, do not apply changes",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The table name"
)

Output #

Output class for SyncTableAndDataFrameSchema

new_df_schema class-attribute instance-attribute #

new_df_schema: StructType = Field(
    default=..., description="New DataFrame schema"
)

new_sf_schema class-attribute instance-attribute #

new_sf_schema: StructType = Field(
    default=..., description="New Snowflake schema"
)

original_df_schema class-attribute instance-attribute #

original_df_schema: StructType = Field(
    default=..., description="Original DataFrame schema"
)

original_sf_schema class-attribute instance-attribute #

original_sf_schema: StructType = Field(
    default=..., description="Original Snowflake schema"
)

sf_table_altered class-attribute instance-attribute #

sf_table_altered: bool = Field(
    default=False,
    description="Flag to indicate whether Snowflake schema has been altered",
)

execute #

execute()
Source code in src/koheesio/spark/snowflake.py
def execute(self):
    self.log.warning("Snowflake table will always take a priority in case of data type conflicts!")

    # spark side
    df_schema = self.df.schema
    self.output.original_df_schema = deepcopy(df_schema)  # using deepcopy to avoid storing in place changes
    df_cols = [c.name.lower() for c in df_schema]

    # snowflake side
    sf_schema = GetTableSchema(**self.get_options(), table=self.table).execute().table_schema
    self.output.original_sf_schema = sf_schema
    sf_cols = [c.name.lower() for c in sf_schema]

    if self.dry_run:
        # Display differences between Spark DataFrame and Snowflake schemas
        # and provide dummy values that are expected as class outputs.
        self.log.warning(f"Columns to be added to Snowflake table: {set(df_cols) - set(sf_cols)}")
        self.log.warning(f"Columns to be added to Spark DataFrame: {set(sf_cols) - set(df_cols)}")

        self.output.new_df_schema = t.StructType()
        self.output.new_sf_schema = t.StructType()
        self.output.df = self.df
        self.output.sf_table_altered = False

    else:
        # Add columns to SnowFlake table that exist in DataFrame
        for df_column in df_schema:
            if df_column.name.lower() not in sf_cols:
                AddColumn(
                    **self.get_options(),
                    table=self.table,
                    column=df_column.name,
                    type=df_column.dataType,
                ).execute()
                self.output.sf_table_altered = True

        if self.output.sf_table_altered:
            sf_schema = GetTableSchema(**self.get_options(), table=self.table).execute().table_schema
            sf_cols = [c.name.lower() for c in sf_schema]

        self.output.new_sf_schema = sf_schema

        # Add NULL columns to the DataFrame if they exist in SnowFlake but not in the df
        df = self.df
        for sf_col in self.output.original_sf_schema:
            sf_col_name = sf_col.name.lower()
            if sf_col_name not in df_cols:
                sf_col_type = sf_col.dataType
                df = df.withColumn(sf_col_name, f.lit(None).cast(sf_col_type))

        # Put DataFrame columns in the same order as the Snowflake table
        df = df.select(*sf_cols)

        self.output.df = df
        self.output.new_df_schema = df.schema

koheesio.spark.snowflake.SynchronizeDeltaToSnowflakeTask #

Synchronize a Delta table to a Snowflake table

  • Overwrite - only in batch mode
  • Append - supports batch and streaming mode
  • Merge - only in streaming mode
Example
SynchronizeDeltaToSnowflakeTask(
    url="acme.snowflakecomputing.com",
    user="admin",
    role="ADMIN",
    warehouse="SF_WAREHOUSE",
    database="SF_DATABASE",
    schema="SF_SCHEMA",
    source_table=DeltaTableStep(...),
    target_table="my_sf_table",
    key_columns=[
        "id",
    ],
    streaming=False,
).run()

checkpoint_location class-attribute instance-attribute #

checkpoint_location: Optional[str] = Field(
    default=None, description="Checkpoint location to use"
)

enable_deletion class-attribute instance-attribute #

enable_deletion: Optional[bool] = Field(
    default=False,
    description="In case of merge synchronisation_mode add deletion statement in merge query.",
)

key_columns class-attribute instance-attribute #

key_columns: Optional[List[str]] = Field(
    default_factory=list,
    description="Key columns on which merge statements will be MERGE statement will be applied.",
)

non_key_columns property #

non_key_columns: List[str]

Columns of source table that aren't part of the (composite) primary key

persist_staging class-attribute instance-attribute #

persist_staging: Optional[bool] = Field(
    default=False,
    description="In case of debugging, set `persist_staging` to True to retain the staging table for inspection after synchronization.",
)

reader property #

reader

DeltaTable reader

Returns:
DeltaTableReader the will yield source delta table

schema_tracking_location class-attribute instance-attribute #

schema_tracking_location: Optional[str] = Field(
    default=None,
    description="Schema tracking location to use. Info: https://docs.delta.io/latest/delta-streaming.html#-schema-tracking",
)

source_table class-attribute instance-attribute #

source_table: DeltaTableStep = Field(
    default=...,
    description="Source delta table to synchronize",
)

staging_table property #

staging_table

Intermediate table on snowflake where staging results are stored

staging_table_name class-attribute instance-attribute #

staging_table_name: Optional[str] = Field(
    default=None,
    alias="staging_table",
    description="Optional snowflake staging name",
    validate_default=False,
)

streaming class-attribute instance-attribute #

streaming: Optional[bool] = Field(
    default=False,
    description="Should synchronisation happen in streaming or in batch mode. Streaming is supported in 'APPEND' and 'MERGE' mode. Batch is supported in 'OVERWRITE' and 'APPEND' mode.",
)

synchronisation_mode class-attribute instance-attribute #

synchronisation_mode: BatchOutputMode = Field(
    default=MERGE,
    description="Determines if synchronisation will 'overwrite' any existing table, 'append' new rows or 'merge' with existing rows.",
)

target_table class-attribute instance-attribute #

target_table: str = Field(
    default=...,
    description="Target table in snowflake to synchronize to",
)

writer property #

Writer to persist to snowflake

Depending on configured options, this returns an SnowflakeWriter or ForEachBatchStreamWriter: - OVERWRITE/APPEND mode yields SnowflakeWriter - MERGE mode yields ForEachBatchStreamWriter

Returns:

Type Description
Union[ForEachBatchStreamWriter, SnowflakeWriter]

writer_ class-attribute instance-attribute #

drop_table #

drop_table(snowflake_table)

Drop a given snowflake table

Source code in src/koheesio/spark/snowflake.py
def drop_table(self, snowflake_table):
    """Drop a given snowflake table"""
    self.log.warning(f"Dropping table {snowflake_table} from snowflake")
    drop_table_query = f"""DROP TABLE IF EXISTS {snowflake_table}"""
    query_executor = RunQuery(**self.get_options(), query=drop_table_query)
    query_executor.execute()

execute #

execute() -> None
Source code in src/koheesio/spark/snowflake.py
def execute(self) -> None:
    # extract
    df = self.extract()
    self.output.source_df = df

    # synchronize
    self.output.target_df = df
    self.load(df)
    if not self.persist_staging:
        # If it's a streaming job, await for termination before dropping staging table
        if self.streaming:
            self.writer.await_termination()
        self.drop_table(self.staging_table)

extract #

extract() -> DataFrame

Extract source table

Source code in src/koheesio/spark/snowflake.py
def extract(self) -> DataFrame:
    """
    Extract source table
    """
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        if not self.source_table.is_cdf_active:
            raise RuntimeError(
                f"Source table {self.source_table.table_name} does not have CDF enabled. "
                f"Set TBLPROPERTIES ('delta.enableChangeDataFeed' = true) to enable. "
                f"Current properties = {self.source_table_properties}"
            )

    df = self.reader.read()
    self.output.source_df = df
    return df

load #

load(df) -> DataFrame

Load source table into snowflake

Source code in src/koheesio/spark/snowflake.py
def load(self, df) -> DataFrame:
    """Load source table into snowflake"""
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        self.log.info(f"Truncating staging table {self.staging_table}")
        self.truncate_table(self.staging_table)
    self.writer.write(df)
    self.output.target_df = df
    return df

run #

run()

alias of execute

Source code in src/koheesio/spark/snowflake.py
def run(self):
    """alias of execute"""
    return self.execute()

truncate_table #

truncate_table(snowflake_table)

Truncate a given snowflake table

Source code in src/koheesio/spark/snowflake.py
def truncate_table(self, snowflake_table):
    """Truncate a given snowflake table"""
    truncate_query = f"""TRUNCATE TABLE IF EXISTS {snowflake_table}"""
    query_executor = RunQuery(
        **self.get_options(),
        query=truncate_query,
    )
    query_executor.execute()

koheesio.spark.snowflake.TableExists #

Check if the table exists in Snowflake by using INFORMATION_SCHEMA.

Example
k = TableExists(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
    table="table",
)

Output #

Output class for TableExists

exists class-attribute instance-attribute #

exists: bool = Field(
    default=...,
    description="Whether or not the table exists",
)

execute #

execute()
Source code in src/koheesio/spark/snowflake.py
def execute(self):
    query = (
        dedent(
            # Force upper case, due to case-sensitivity of where clause
            f"""
        SELECT *
        FROM INFORMATION_SCHEMA.TABLES
        WHERE TABLE_CATALOG     = '{self.database}'
          AND TABLE_SCHEMA      = '{self.sfSchema}'
          AND TABLE_TYPE        = 'BASE TABLE'
          AND upper(TABLE_NAME) = '{self.table.upper()}'
        """  # nosec B608: hardcoded_sql_expressions
        )
        .upper()
        .strip()
    )

    self.log.debug(f"Query that was executed to check if the table exists:\n{query}")

    df = Query(**self.get_options(), query=query).read()

    exists = df.count() > 0
    self.log.info(f"Table {self.table} {'exists' if exists else 'does not exist'}")
    self.output.exists = exists

koheesio.spark.snowflake.TagSnowflakeQuery #

Provides Snowflake query tag pre-action that can be used to easily find queries through SF history search and further group them for debugging and cost tracking purposes.

Takes in query tag attributes as kwargs and additional Snowflake options dict that can optionally contain other set of pre-actions to be applied to a query, in that case existing pre-action aren't dropped, query tag pre-action will be added to them.

Passed Snowflake options dictionary is not modified in-place, instead anew dictionary containing updated pre-actions is returned.

Notes

See this article for explanation: https://select.dev/posts/snowflake-query-tags

Arbitrary tags can be applied, such as team, dataset names, business capability, etc.

Example
query_tag = AddQueryTag(
    options={"preactions": ...},
    task_name="cleanse_task",
    pipeline_name="ingestion-pipeline",
    etl_date="2022-01-01",
    pipeline_execution_time="2022-01-01T00:00:00",
    task_execution_time="2022-01-01T01:00:00",
    environment="dev",
    trace_id="e0fdec43-a045-46e5-9705-acd4f3f96045",
    span_id="cb89abea-1c12-471f-8b12-546d2d66f6cb",
    ),
).execute().options

options class-attribute instance-attribute #

options: Dict = Field(
    default_factory=dict,
    description="Additional Snowflake options, optionally containing additional preactions",
)

Output #

Output class for AddQueryTag

options class-attribute instance-attribute #

options: Dict = Field(
    default=...,
    description="Copy of provided SF options, with added query tag preaction",
)

execute #

execute()

Add query tag preaction to Snowflake options

Source code in src/koheesio/spark/snowflake.py
def execute(self):
    """Add query tag preaction to Snowflake options"""
    tag_json = json.dumps(self.extra_params, indent=4, sort_keys=True)
    tag_preaction = f"ALTER SESSION SET QUERY_TAG = '{tag_json}';"
    preactions = self.options.get("preactions", "")
    preactions = f"{preactions}\n{tag_preaction}".strip()
    updated_options = dict(self.options)
    updated_options["preactions"] = preactions
    self.output.options = updated_options

koheesio.spark.snowflake.map_spark_type #

map_spark_type(spark_type: DataType)

Translates Spark DataFrame Schema type to SnowFlake type

Basic Types Snowflake Type
StringType STRING
NullType STRING
BooleanType BOOLEAN
Numeric Types Snowflake Type
LongType BIGINT
IntegerType INT
ShortType SMALLINT
DoubleType DOUBLE
FloatType FLOAT
NumericType FLOAT
ByteType BINARY
Date / Time Types Snowflake Type
DateType DATE
TimestampType TIMESTAMP
Advanced Types Snowflake Type
DecimalType DECIMAL
MapType VARIANT
ArrayType VARIANT
StructType VARIANT
References

Parameters:

Name Type Description Default
spark_type DataType

DataType taken out of the StructField

required

Returns:

Type Description
str

The Snowflake data type

Source code in src/koheesio/spark/snowflake.py
def map_spark_type(spark_type: t.DataType):
    """
    Translates Spark DataFrame Schema type to SnowFlake type

    | Basic Types       | Snowflake Type |
    |-------------------|----------------|
    | StringType        | STRING         |
    | NullType          | STRING         |
    | BooleanType       | BOOLEAN        |

    | Numeric Types     | Snowflake Type |
    |-------------------|----------------|
    | LongType          | BIGINT         |
    | IntegerType       | INT            |
    | ShortType         | SMALLINT       |
    | DoubleType        | DOUBLE         |
    | FloatType         | FLOAT          |
    | NumericType       | FLOAT          |
    | ByteType          | BINARY         |

    | Date / Time Types | Snowflake Type |
    |-------------------|----------------|
    | DateType          | DATE           |
    | TimestampType     | TIMESTAMP      |

    | Advanced Types    | Snowflake Type |
    |-------------------|----------------|
    | DecimalType       | DECIMAL        |
    | MapType           | VARIANT        |
    | ArrayType         | VARIANT        |
    | StructType        | VARIANT        |

    References
    ----------
    - Spark SQL DataTypes: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
    - Snowflake DataTypes: https://docs.snowflake.com/en/sql-reference/data-types.html

    Parameters
    ----------
    spark_type : pyspark.sql.types.DataType
        DataType taken out of the StructField

    Returns
    -------
    str
        The Snowflake data type
    """
    # StructField means that the entire Field was passed, we need to extract just the dataType before continuing
    if isinstance(spark_type, t.StructField):
        spark_type = spark_type.dataType

    # Check if the type is DayTimeIntervalType
    if isinstance(spark_type, t.DayTimeIntervalType):
        warn(
            "DayTimeIntervalType is being converted to STRING. "
            "Consider converting to a more supported date/time/timestamp type in Snowflake."
        )

    # fmt: off
    # noinspection PyUnresolvedReferences
    data_type_map = {
        # Basic Types
        t.StringType: "STRING",
        t.NullType: "STRING",
        t.BooleanType: "BOOLEAN",

        # Numeric Types
        t.LongType: "BIGINT",
        t.IntegerType: "INT",
        t.ShortType: "SMALLINT",
        t.DoubleType: "DOUBLE",
        t.FloatType: "FLOAT",
        t.NumericType: "FLOAT",
        t.ByteType: "BINARY",
        t.BinaryType: "VARBINARY",

        # Date / Time Types
        t.DateType: "DATE",
        t.TimestampType: "TIMESTAMP",
        t.DayTimeIntervalType: "STRING",

        # Advanced Types
        t.DecimalType:
            f"DECIMAL({spark_type.precision},{spark_type.scale})"  # pylint: disable=no-member
            if isinstance(spark_type, t.DecimalType) else "DECIMAL(38,0)",
        t.MapType: "VARIANT",
        t.ArrayType: "VARIANT",
        t.StructType: "VARIANT",
    }
    return data_type_map.get(type(spark_type), 'STRING')