Skip to content

Snowflake

Snowflake steps and tasks for Koheesio

koheesio.spark.snowflake.AddColumn #

Add an empty column to a Snowflake table with given name and DataType

Example
AddColumn(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    col="MY_COL",
    dataType=StringType(),
).execute()

account class-attribute instance-attribute #

account: str = Field(
    default=..., description="The Snowflake account"
)

column class-attribute instance-attribute #

column: str = Field(
    default=..., description="The name of the new column"
)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    description="The name of the Snowflake table",
)

type class-attribute instance-attribute #

type: DataType = Field(
    default=...,
    description="The DataType represented as a Spark DataType",
)

Output #

Output class for AddColumn

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to add the column",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = f"ALTER TABLE {self.table} ADD COLUMN {self.column} {map_spark_type(self.type)}".upper()
    self.output.query = query
    SnowflakeRunQueryPython(**self.get_options(), query=query).execute()

koheesio.spark.snowflake.CreateOrReplaceTableFromDataFrame #

Create (or Replace) a Snowflake table which has the same schema as a Spark DataFrame

Can be used as any Transformation. The DataFrame is however left unchanged, and only used for determining the schema of the Snowflake Table that is to be created (or replaced).

Example
CreateOrReplaceTableFromDataFrame(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password="super-secret-password",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    df=df,
).execute()

Or, as a Transformation:

CreateOrReplaceTableFromDataFrame(
    ...
    table="MY_TABLE",
).transform(df)

account class-attribute instance-attribute #

account: str = Field(
    default=..., description="The Snowflake account"
)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    alias="table_name",
    description="The name of the (new) table",
)

Output #

Output class for CreateOrReplaceTableFromDataFrame

input_schema class-attribute instance-attribute #

input_schema: StructType = Field(
    default=...,
    description="The original schema from the input DataFrame",
)

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to create the table",
)

snowflake_schema class-attribute instance-attribute #

snowflake_schema: str = Field(
    default=...,
    description="Derived Snowflake table schema based on the input DataFrame",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    self.output.df = self.df

    input_schema = self.df.schema
    self.output.input_schema = input_schema

    snowflake_schema = ", ".join([f"{c.name} {map_spark_type(c.dataType)}" for c in input_schema])
    self.output.snowflake_schema = snowflake_schema

    table_name = f"{self.database}.{self.sfSchema}.{self.table}"
    query = f"CREATE OR REPLACE TABLE {table_name} ({snowflake_schema})"
    self.output.query = query

    SnowflakeRunQueryPython(**self.get_options(), query=query).execute()

koheesio.spark.snowflake.DbTableQuery #

Read table from Snowflake using the dbtable option instead of query

Example
DbTableQuery(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="user",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="db.schema.table",
).execute().df

dbtable class-attribute instance-attribute #

dbtable: str = Field(
    default=...,
    alias="table",
    description="The name of the table",
)

koheesio.spark.snowflake.GetTableSchema #

Get the schema from a Snowflake table as a Spark Schema

Notes
  • This Step will execute a SELECT * FROM <table> LIMIT 1 query to get the schema of the table.
  • The schema will be stored in the table_schema attribute of the output.
  • table_schema is used as the attribute name to avoid conflicts with the schema attribute of Pydantic's BaseModel.
Example
schema = (
    GetTableSchema(
        database="MY_DB",
        schema_="MY_SCHEMA",
        warehouse="MY_WH",
        user="gid.account@abc.com",
        password="super-secret-password",
        role="APPLICATION.SNOWFLAKE.ADMIN",
        table="MY_TABLE",
    )
    .execute()
    .table_schema
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The Snowflake table name"
)

Output #

Output class for GetTableSchema

table_schema class-attribute instance-attribute #

table_schema: StructType = Field(
    default=...,
    serialization_alias="schema",
    description="The Spark Schema",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = f"SELECT * FROM {self.table} LIMIT 1"  # nosec B608: hardcoded_sql_expressions
    df = Query(**self.get_options(), query=query).execute().df
    self.output.table_schema = df.schema

koheesio.spark.snowflake.GrantPrivilegesOnFullyQualifiedObject #

Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name

This class is a subclass of GrantPrivilegesOnObject and is used to grant privileges on a fully qualified object. The advantage of using this class is that it sets the object name to be fully qualified, i.e. database.schema.object_name.

Meaning, you can set the database, schema and object separately and the object name will be set to be fully qualified, i.e. database.schema.object_name.

Example
GrantPrivilegesOnFullyQualifiedObject(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    ...
    object="MY_TABLE",
    type="TABLE",
    ...
)

In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE. If you were to use GrantPrivilegesOnObject instead, you would have to set the object name to be fully qualified yourself.

set_object_name #

set_object_name() -> (
    "GrantPrivilegesOnFullyQualifiedObject"
)

Set the object name to be fully qualified, i.e. database.schema.object_name

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def set_object_name(self) -> "GrantPrivilegesOnFullyQualifiedObject":
    """Set the object name to be fully qualified, i.e. database.schema.object_name"""
    # database, schema, obj_name
    db = self.database
    schema = self.model_dump()["sfSchema"]  # since "schema" is a reserved name
    obj_name = self.object

    self.object = f"{db}.{schema}.{obj_name}"

    return self

koheesio.spark.snowflake.GrantPrivilegesOnObject #

A wrapper on Snowflake GRANT privileges

With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object

See Also

https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html

Parameters:

Name Type Description Default
account str

Snowflake Account Name.

required
warehouse str

The name of the warehouse. Alias for sfWarehouse

required
user str

The username. Alias for sfUser

required
password SecretStr

The password. Alias for sfPassword

required
role str

The role name

required
object str

The name of the object to grant privileges on

required
type str

The type of object to grant privileges on, e.g. TABLE, VIEW

required
privileges Union[conlist(str, min_length=1), str]

The Privilege/Permission or list of Privileges/Permissions to grant on the given object.

required
roles Union[conlist(str, min_length=1), str]

The Role or list of Roles to grant the privileges to

required
Example
GrantPermissionsOnTable(
    object="MY_TABLE",
    type="TABLE",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    permissions=["SELECT", "INSERT"],
).execute()

In this example, the APPLICATION.SNOWFLAKE.ADMIN role will be granted SELECT and INSERT privileges on the MY_TABLE table using the MY_WH warehouse.

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    description="The name of the object to grant privileges on",
)

privileges class-attribute instance-attribute #

privileges: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="permissions",
    description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)

query class-attribute instance-attribute #

query: str = (
    "GRANT {privileges} ON {type} {object} TO ROLE {role}"
)

roles class-attribute instance-attribute #

roles: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="role",
    validation_alias="roles",
    description="The Role or list of Roles to grant the privileges to",
)

type class-attribute instance-attribute #

type: str = Field(
    default=...,
    description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)

Output #

Output class for GrantPrivilegesOnObject

query class-attribute instance-attribute #

query: conlist(str, min_length=1) = Field(
    default=...,
    description="Query that was executed to grant privileges",
    validate_default=False,
)

execute #

execute() -> None
Source code in src/koheesio/integrations/snowflake/__init__.py
def execute(self) -> None:
    self.output.query = []
    roles = self.roles

    for role in roles:
        query = self.get_query(role)
        self.output.query.append(query)

        # Create a new instance of SnowflakeRunQueryPython with the current query
        instance = SnowflakeRunQueryPython.from_step(self, query=query)
        instance.execute()
        print(f"{instance.output = }")
        self.output.results.extend(instance.output.results)

get_query #

get_query(role: str) -> str

Build the GRANT query

Parameters:

Name Type Description Default
role str

The role name

required

Returns:

Name Type Description
query str

The Query that performs the grant

Source code in src/koheesio/integrations/snowflake/__init__.py
def get_query(self, role: str) -> str:
    """Build the GRANT query

    Parameters
    ----------
    role: str
        The role name

    Returns
    -------
    query : str
        The Query that performs the grant
    """
    query = self.query.format(
        privileges=",".join(self.privileges),
        type=self.type,
        object=self.object,
        role=role,
    )
    return query

set_roles_privileges #

set_roles_privileges(values: dict) -> dict

Coerce roles and privileges to be lists if they are not already.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="before")
def set_roles_privileges(cls, values: dict) -> dict:
    """Coerce roles and privileges to be lists if they are not already."""
    roles_value = values.get("roles") or values.get("role")
    privileges_value = values.get("privileges")

    if not (roles_value and privileges_value):
        raise ValueError("You have to specify roles AND privileges when using 'GrantPrivilegesOnObject'.")

    # coerce values to be lists
    values["roles"] = [roles_value] if isinstance(roles_value, str) else roles_value
    values["role"] = values["roles"][0]  # hack to keep the validator happy
    values["privileges"] = [privileges_value] if isinstance(privileges_value, str) else privileges_value

    return values

validate_object_and_object_type #

validate_object_and_object_type() -> (
    "GrantPrivilegesOnObject"
)

Validate that the object and type are set.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def validate_object_and_object_type(self) -> "GrantPrivilegesOnObject":
    """Validate that the object and type are set."""
    object_value = self.object
    if not object_value:
        raise ValueError("You must provide an `object`, this should be the name of the object. ")

    object_type = self.type
    if not object_type:
        raise ValueError(
            "You must provide a `type`, e.g. TABLE, VIEW, DATABASE. "
            "See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html"
        )

    return self

koheesio.spark.snowflake.GrantPrivilegesOnTable #

Grant Snowflake privileges to a set of roles on a table

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="table",
    description="The name of the Table to grant Privileges on. This should be just the name of the table; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'TABLE'

koheesio.spark.snowflake.GrantPrivilegesOnView #

Grant Snowflake privileges to a set of roles on a view

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="view",
    description="The name of the View to grant Privileges on. This should be just the name of the view; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'VIEW'

koheesio.spark.snowflake.Query #

Query data from Snowflake and return the result as a DataFrame

Example
Query(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="SELECT * FROM MY_TABLE",
).execute().df

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run"
)

get_options #

get_options(
    by_alias: bool = True, include: Set[str] = None
) -> Dict[str, Any]

add query to options

Source code in src/koheesio/integrations/spark/snowflake.py
def get_options(self, by_alias: bool = True, include: Set[str] = None) -> Dict[str, Any]:
    """add query to options"""
    options = super().get_options(by_alias)
    options["query"] = self.query
    return options

validate_query #

validate_query(query: str) -> str

Replace escape characters

Source code in src/koheesio/integrations/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    return query

koheesio.spark.snowflake.RunQuery #

Run a query on Snowflake that does not return a result, e.g. create table statement

This is a wrapper around 'net.snowflake.spark.snowflake.Utils.runQuery' on the JVM

Example
RunQuery(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="account",
    password="***",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="CREATE TABLE test (col1 string)",
).execute()

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run", alias="sql"
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> RunQuery.Output:
    # Executing the RunQuery without `host` option raises the following error:
    #   An error occurred while calling z:net.snowflake.spark.snowflake.Utils.runQuery.
    #   : java.util.NoSuchElementException: key not found: host
    options = self.get_options()
    options["host"] = self.url
    # noinspection PyProtectedMember
    self.spark._jvm.net.snowflake.spark.snowflake.Utils.runQuery(self.get_options(), self.query)

validate_query #

validate_query(query: str) -> str

Replace escape characters, strip whitespace, ensure it is not empty

Source code in src/koheesio/integrations/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters, strip whitespace, ensure it is not empty"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    if not query:
        raise ValueError("Query cannot be empty")
    return query

validate_spark_and_deprecate #

validate_spark_and_deprecate() -> RunQuery

If we do not have a spark session with a JVM, we can not use spark to run the query

Source code in src/koheesio/integrations/spark/snowflake.py
@model_validator(mode="after")
def validate_spark_and_deprecate(self) -> RunQuery:
    """If we do not have a spark session with a JVM, we can not use spark to run the query"""
    warn(
        "The RunQuery class is deprecated and will be removed in a future release. "
        "Please use the Python connector for Snowflake instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    if not hasattr(self.spark, "_jvm"):
        raise RuntimeError(
            "Your Spark session does not have a JVM and cannot run Snowflake query using RunQuery implementation. "
            "Please update your code to use python connector for Snowflake."
        )
    return self

koheesio.spark.snowflake.SnowflakeBaseModel #

BaseModel for setting up Snowflake Driver options.

Notes

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
database Optional[str]

The database to use for the session after connecting. Alias for sfDatabase.

None
sfSchema Optional[str]

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}

authenticator class-attribute instance-attribute #

authenticator: Optional[str] = Field(
    default=None,
    description="Authenticator for the Snowflake user",
    examples=["okta.com"],
)

database class-attribute instance-attribute #

database: Optional[str] = Field(
    default=None,
    alias="sfDatabase",
    description="The database to use for the session after connecting",
)

options class-attribute instance-attribute #

options: Optional[Dict[str, Any]] = Field(
    default={
        "sfCompress": "on",
        "continue_on_error": "off",
    },
    description="Extra options to pass to the Snowflake connector",
)

password class-attribute instance-attribute #

password: SecretStr = Field(
    default=...,
    alias="sfPassword",
    description="Password for the Snowflake user",
)

role class-attribute instance-attribute #

role: str = Field(
    default=...,
    alias="sfRole",
    description="The default security role to use for the session after connecting",
)

sfSchema class-attribute instance-attribute #

sfSchema: Optional[str] = Field(
    default=...,
    alias="schema",
    description="The schema to use for the session after connecting",
)

url class-attribute instance-attribute #

url: str = Field(
    default=...,
    alias="sfURL",
    description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
    examples=["example.snowflakecomputing.com"],
)

user class-attribute instance-attribute #

user: str = Field(
    default=...,
    alias="sfUser",
    description="Login name for the Snowflake user",
)

warehouse class-attribute instance-attribute #

warehouse: str = Field(
    default=...,
    alias="sfWarehouse",
    description="The default virtual warehouse to use for the session after connecting",
)

get_options #

get_options(
    by_alias: bool = True,
    include: Optional[Set[str]] = None,
) -> Dict[str, Any]

Get the sfOptions as a dictionary.

Note
  • Any parameters that are None are excluded from the output dictionary.
  • sfSchema and password are handled separately.
  • The values from both 'options' and 'params' (kwargs / extra params) are included as is.
  • Koheesio specific fields are excluded by default (i.e. name, description, format).

Parameters:

Name Type Description Default
by_alias bool

Whether to use the alias names or not. E.g. sfURL instead of url

True
include Optional[Set[str]]

Set of keys to include in the output dictionary. When None is provided, all fields will be returned. Note: be sure to include all the keys you need.

None
Source code in src/koheesio/integrations/snowflake/__init__.py
def get_options(self, by_alias: bool = True, include: Optional[Set[str]] = None) -> Dict[str, Any]:
    """Get the sfOptions as a dictionary.

    Note
    ----
    - Any parameters that are `None` are excluded from the output dictionary.
    - `sfSchema` and `password` are handled separately.
    - The values from both 'options' and 'params' (kwargs / extra params) are included as is.
    - Koheesio specific fields are excluded by default (i.e. `name`, `description`, `format`).

    Parameters
    ----------
    by_alias : bool, optional, default=True
        Whether to use the alias names or not. E.g. `sfURL` instead of `url`
    include : Optional[Set[str]], optional, default=None
        Set of keys to include in the output dictionary. When None is provided, all fields will be returned.
        Note: be sure to include all the keys you need.
    """
    exclude_set = {
        # Exclude koheesio specific fields
        "name",
        "description",
        # options and params are separately implemented
        "params",
        "options",
        # schema and password have to be handled separately
        "sfSchema",
        "password",
    } - (include or set())

    fields = self.model_dump(
        by_alias=by_alias,
        exclude_none=True,
        exclude=exclude_set,
    )

    # handle schema and password
    fields.update(
        {
            "sfSchema" if by_alias else "schema": self.sfSchema,
            "sfPassword" if by_alias else "password": self.password.get_secret_value(),
        }
    )

    # handle include
    if include:
        # user specified filter
        fields = {key: value for key, value in fields.items() if key in include}
    else:
        # default filter
        include = {"options", "params"}

    # handle options
    if "options" in include:
        options = fields.pop("options", self.options)
        fields.update(**options)

    # handle params
    if "params" in include:
        params = fields.pop("params", self.params)
        fields.update(**params)

    return {key: value for key, value in fields.items() if value}

koheesio.spark.snowflake.SnowflakeReader #

Wrapper around JdbcReader for Snowflake.

Example
sr = SnowflakeReader(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
)
df = sr.read()
Notes

driver class-attribute instance-attribute #

driver: Optional[str] = None

format class-attribute instance-attribute #

format: str = Field(
    default="snowflake",
    description="The format to use when writing to Snowflake",
)

execute #

execute() -> Output

Read from Snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SparkStep.Output:
    """Read from Snowflake"""
    super().execute()

koheesio.spark.snowflake.SnowflakeStep #

Expands the SnowflakeBaseModel so that it can be used as a Step

koheesio.spark.snowflake.SnowflakeTableStep #

Expands the SnowflakeStep, adding a 'table' parameter

full_name property #

full_name: str

Returns the fullname of snowflake table based on schema and database parameters.

Returns:

Type Description
str

Snowflake Complete table name (database.schema.table)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The name of the table"
)

koheesio.spark.snowflake.SnowflakeTransformation #

Adds Snowflake parameters to the Transformation class

koheesio.spark.snowflake.SnowflakeWriter #

Class for writing to Snowflake

See Also

format class-attribute instance-attribute #

format: str = Field(
    "snowflake",
    description="The format to use when writing to Snowflake",
)

insert_type class-attribute instance-attribute #

insert_type: Optional[BatchOutputMode] = Field(
    APPEND,
    alias="mode",
    description="The insertion type, append or overwrite",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="Target table name"
)

execute #

execute() -> Output

Write to Snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SnowflakeWriter.Output:
    """Write to Snowflake"""
    self.log.debug(f"writing to {self.table} with mode {self.insert_type}")
    self.df.write.format(self.format).options(**self.get_options()).option("dbtable", self.table).mode(
        self.insert_type
    ).save()

koheesio.spark.snowflake.SyncTableAndDataFrameSchema #

Sync the schema's of a Snowflake table and a DataFrame. This will add NULL columns for the columns that are not in both and perform type casts where needed.

The Snowflake table will take priority in case of type conflicts.

df class-attribute instance-attribute #

df: DataFrame = Field(
    default=..., description="The Spark DataFrame"
)

dry_run class-attribute instance-attribute #

dry_run: bool = Field(
    default=False,
    description="Only show schema differences, do not apply changes",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The table name"
)

Output #

Output class for SyncTableAndDataFrameSchema

new_df_schema class-attribute instance-attribute #

new_df_schema: StructType = Field(
    default=..., description="New DataFrame schema"
)

new_sf_schema class-attribute instance-attribute #

new_sf_schema: StructType = Field(
    default=..., description="New Snowflake schema"
)

original_df_schema class-attribute instance-attribute #

original_df_schema: StructType = Field(
    default=..., description="Original DataFrame schema"
)

original_sf_schema class-attribute instance-attribute #

original_sf_schema: StructType = Field(
    default=..., description="Original Snowflake schema"
)

sf_table_altered class-attribute instance-attribute #

sf_table_altered: bool = Field(
    default=False,
    description="Flag to indicate whether Snowflake schema has been altered",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    self.log.warning("Snowflake table will always take a priority in case of data type conflicts!")

    # spark side
    df_schema = self.df.schema
    self.output.original_df_schema = deepcopy(df_schema)  # using deepcopy to avoid storing in place changes
    df_cols = {c.name.lower() for c in df_schema}

    # snowflake side
    _options = {**self.get_options(), "table": self.table}
    sf_schema = GetTableSchema(**_options).execute().table_schema
    self.output.original_sf_schema = sf_schema
    sf_cols = {c.name.lower() for c in sf_schema}

    if self.dry_run:
        # Display differences between Spark DataFrame and Snowflake schemas
        # and provide dummy values that are expected as class outputs.
        _sf_diff = df_cols - sf_cols
        self.log.warning(f"Columns to be added to Snowflake table: {set(df_cols) - set(sf_cols)}")
        _df_diff = sf_cols - df_cols
        self.log.warning(f"Columns to be added to Spark DataFrame: {set(sf_cols) - set(df_cols)}")

        self.output.new_df_schema = t.StructType()
        self.output.new_sf_schema = t.StructType()
        self.output.df = self.df
        self.output.sf_table_altered = False

    else:
        # Add columns to SnowFlake table that exist in DataFrame
        for df_column in df_schema:
            if df_column.name.lower() not in sf_cols:
                AddColumn(
                    **self.get_options(),
                    table=self.table,
                    column=df_column.name,
                    type=df_column.dataType,
                ).execute()
                self.output.sf_table_altered = True

        if self.output.sf_table_altered:
            sf_schema = GetTableSchema(**self.get_options(), table=self.table).execute().table_schema
            sf_cols = {c.name.lower() for c in sf_schema}

        self.output.new_sf_schema = sf_schema

        # Add NULL columns to the DataFrame if they exist in SnowFlake but not in the df
        df = self.df
        for sf_col in self.output.original_sf_schema:
            sf_col_name = sf_col.name.lower()
            if sf_col_name not in df_cols:
                sf_col_type = sf_col.dataType
                df = df.withColumn(sf_col_name, f.lit(None).cast(sf_col_type))  # type: ignore

        # Put DataFrame columns in the same order as the Snowflake table
        df = df.select(*sf_cols)

        self.output.df = df
        self.output.new_df_schema = df.schema

koheesio.spark.snowflake.SynchronizeDeltaToSnowflakeTask #

Synchronize a Delta table to a Snowflake table

  • Overwrite - only in batch mode
  • Append - supports batch and streaming mode
  • Merge - only in streaming mode
Example
SynchronizeDeltaToSnowflakeTask(
    account="acme",
    url="acme.snowflakecomputing.com",
    user="admin",
    role="ADMIN",
    warehouse="SF_WAREHOUSE",
    database="SF_DATABASE",
    schema="SF_SCHEMA",
    source_table=DeltaTableStep(...),
    target_table="my_sf_table",
    key_columns=[
        "id",
    ],
    streaming=False,
).run()

account class-attribute instance-attribute #

account: Optional[str] = Field(
    default=None,
    description="The Snowflake account to connect to. If not provided, the `truncate_table` and `drop_table` methods will fail.",
)

checkpoint_location class-attribute instance-attribute #

checkpoint_location: Optional[str] = Field(
    default=None, description="Checkpoint location to use"
)

enable_deletion class-attribute instance-attribute #

enable_deletion: bool = Field(
    default=False,
    description="In case of merge synchronisation_mode add deletion statement in merge query.",
)

key_columns class-attribute instance-attribute #

key_columns: Optional[List[str]] = Field(
    default_factory=list,
    description="Key columns on which merge statements will be MERGE statement will be applied.",
)

non_key_columns property #

non_key_columns: List[str]

Columns of source table that aren't part of the (composite) primary key

persist_staging class-attribute instance-attribute #

persist_staging: bool = Field(
    default=False,
    description="In case of debugging, set `persist_staging` to True to retain the staging table for inspection after synchronization.",
)

reader property #

DeltaTable reader

Returns:

DeltaTableReader DeltaTableReader the will yield source delta table

schema_tracking_location class-attribute instance-attribute #

schema_tracking_location: Optional[str] = Field(
    default=None,
    description="Schema tracking location to use. Info: https://docs.delta.io/latest/delta-streaming.html#-schema-tracking",
)

source_table class-attribute instance-attribute #

source_table: DeltaTableStep = Field(
    default=...,
    description="Source delta table to synchronize",
)

staging_table property #

staging_table: str

Intermediate table on snowflake where staging results are stored

staging_table_name class-attribute instance-attribute #

staging_table_name: Optional[str] = Field(
    default=None,
    alias="staging_table",
    description="Optional snowflake staging name",
    validate_default=False,
)

streaming class-attribute instance-attribute #

streaming: bool = Field(
    default=False,
    description="Should synchronisation happen in streaming or in batch mode. Streaming is supported in 'APPEND' and 'MERGE' mode. Batch is supported in 'OVERWRITE' and 'APPEND' mode.",
)

synchronisation_mode class-attribute instance-attribute #

synchronisation_mode: BatchOutputMode = Field(
    default=MERGE,
    description="Determines if synchronisation will 'overwrite' any existing table, 'append' new rows or 'merge' with existing rows.",
)

target_table class-attribute instance-attribute #

target_table: str = Field(
    default=...,
    description="Target table in snowflake to synchronize to",
)

writer property #

Writer to persist to snowflake

Depending on configured options, this returns an SnowflakeWriter or ForEachBatchStreamWriter: - OVERWRITE/APPEND mode yields SnowflakeWriter - MERGE mode yields ForEachBatchStreamWriter

Returns:

Type Description
Union[ForEachBatchStreamWriter, SnowflakeWriter]

writer_ class-attribute instance-attribute #

drop_table #

drop_table(snowflake_table: str) -> None

Drop a given snowflake table

Source code in src/koheesio/integrations/spark/snowflake.py
def drop_table(self, snowflake_table: str) -> None:
    """Drop a given snowflake table"""
    self.log.warning(f"Dropping table {snowflake_table} from snowflake")
    drop_table_query = f"""DROP TABLE IF EXISTS {snowflake_table}"""  # nosec B608: hardcoded_sql_expressions
    query_executor = SnowflakeRunQueryPython(**self.get_options(), query=drop_table_query)
    query_executor.execute()

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SynchronizeDeltaToSnowflakeTask.Output:
    # extract
    df = self.extract()
    self.output.source_df = df

    # synchronize
    self.output.target_df = df
    self.load(df)
    if not self.persist_staging:
        # If it's a streaming job, await for termination before dropping staging table
        if self.streaming:
            self.writer.await_termination()  # type: ignore
        self.drop_table(self.staging_table)

extract #

extract() -> DataFrame

Extract source table

Source code in src/koheesio/integrations/spark/snowflake.py
def extract(self) -> DataFrame:
    """
    Extract source table
    """
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        if not self.source_table.is_cdf_active:
            raise RuntimeError(
                f"Source table {self.source_table.table_name} does not have CDF enabled. "
                f"Set TBLPROPERTIES ('delta.enableChangeDataFeed' = true) to enable. "
                f"Current properties = {self.source_table.get_persisted_properties()}"
            )

    df = self.reader.read()
    self.output.source_df = df
    return df

load #

load(df: DataFrame) -> DataFrame

Load source table into snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def load(self, df: DataFrame) -> DataFrame:
    """Load source table into snowflake"""
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        self.log.info(f"Truncating staging table {self.staging_table}")  # type: ignore
        self.truncate_table(self.staging_table)
    self.writer.write(df)
    self.output.target_df = df
    return df

truncate_table #

truncate_table(snowflake_table: str) -> None

Truncate a given snowflake table

Source code in src/koheesio/integrations/spark/snowflake.py
def truncate_table(self, snowflake_table: str) -> None:
    """Truncate a given snowflake table"""
    truncate_query = f"""TRUNCATE TABLE IF EXISTS {snowflake_table}"""  # nosec B608: hardcoded_sql_expressions
    query_executor = SnowflakeRunQueryPython(
        **self.get_options(),
        query=truncate_query,
    )
    query_executor.execute()

koheesio.spark.snowflake.TableExists #

Check if the table exists in Snowflake by using INFORMATION_SCHEMA.

Example
k = TableExists(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
    table="table",
)

Output #

Output class for TableExists

exists class-attribute instance-attribute #

exists: bool = Field(
    default=...,
    description="Whether or not the table exists",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = (
        dedent(
            # Force upper case, due to case-sensitivity of where clause
            f"""
            SELECT *
            FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_CATALOG     = '{self.database}'
              AND TABLE_SCHEMA      = '{self.sfSchema}'
              AND TABLE_TYPE        = 'BASE TABLE'
              AND upper(TABLE_NAME) = '{self.table.upper()}'
            """  # nosec B608: hardcoded_sql_expressions
        )
        .upper()
        .strip()
    )

    self.log.debug(f"Query that was executed to check if the table exists:\n{query}")

    df = Query(**self.get_options(), query=query).read()

    exists = df.count() > 0
    self.log.info(
        f"Table '{self.database}.{self.sfSchema}.{self.table}' {'exists' if exists else 'does not exist'}"
    )
    self.output.exists = exists