Skip to content

Snowflake

Snowflake steps and tasks for Koheesio

Every class in this module is a subclass of Step or Task and is used to perform operations on Snowflake.

Notes

Every Step in this module is based on SnowflakeBaseModel. The following parameters are available for every Step.

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
database str

The database to use for the session after connecting. Alias for sfDatabase.

required
sfSchema str

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}
format str

The default snowflake format can be used natively in Databricks, use net.snowflake.spark.snowflake in other environments and make sure to install required JARs.

"snowflake"

koheesio.integrations.spark.snowflake.AddColumn #

Add an empty column to a Snowflake table with given name and DataType

Example
AddColumn(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    col="MY_COL",
    dataType=StringType(),
).execute()

account class-attribute instance-attribute #

account: str = Field(
    default=..., description="The Snowflake account"
)

column class-attribute instance-attribute #

column: str = Field(
    default=..., description="The name of the new column"
)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    description="The name of the Snowflake table",
)

type class-attribute instance-attribute #

type: DataType = Field(
    default=...,
    description="The DataType represented as a Spark DataType",
)

Output #

Output class for AddColumn

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to add the column",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = f"ALTER TABLE {self.table} ADD COLUMN {self.column} {map_spark_type(self.type)}".upper()
    self.output.query = query
    SnowflakeRunQueryPython(**self.get_options(), query=query).execute()

koheesio.integrations.spark.snowflake.CreateOrReplaceTableFromDataFrame #

Create (or Replace) a Snowflake table which has the same schema as a Spark DataFrame

Can be used as any Transformation. The DataFrame is however left unchanged, and only used for determining the schema of the Snowflake Table that is to be created (or replaced).

Example
CreateOrReplaceTableFromDataFrame(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password="super-secret-password",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="MY_TABLE",
    df=df,
).execute()

Or, as a Transformation:

CreateOrReplaceTableFromDataFrame(
    ...
    table="MY_TABLE",
).transform(df)

account class-attribute instance-attribute #

account: str = Field(
    default=..., description="The Snowflake account"
)

table class-attribute instance-attribute #

table: str = Field(
    default=...,
    alias="table_name",
    description="The name of the (new) table",
)

Output #

Output class for CreateOrReplaceTableFromDataFrame

input_schema class-attribute instance-attribute #

input_schema: StructType = Field(
    default=...,
    description="The original schema from the input DataFrame",
)

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="Query that was executed to create the table",
)

snowflake_schema class-attribute instance-attribute #

snowflake_schema: str = Field(
    default=...,
    description="Derived Snowflake table schema based on the input DataFrame",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    self.output.df = self.df

    input_schema = self.df.schema
    self.output.input_schema = input_schema

    snowflake_schema = ", ".join([f"{c.name} {map_spark_type(c.dataType)}" for c in input_schema])
    self.output.snowflake_schema = snowflake_schema

    table_name = f"{self.database}.{self.sfSchema}.{self.table}"
    query = f"CREATE OR REPLACE TABLE {table_name} ({snowflake_schema})"
    self.output.query = query

    SnowflakeRunQueryPython(**self.get_options(), query=query).execute()

koheesio.integrations.spark.snowflake.DbTableQuery #

Read table from Snowflake using the dbtable option instead of query

Example
DbTableQuery(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="user",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="db.schema.table",
).execute().df

dbtable class-attribute instance-attribute #

dbtable: str = Field(
    default=...,
    alias="table",
    description="The name of the table",
)

koheesio.integrations.spark.snowflake.GetTableSchema #

Get the schema from a Snowflake table as a Spark Schema

Notes
  • This Step will execute a SELECT * FROM <table> LIMIT 1 query to get the schema of the table.
  • The schema will be stored in the table_schema attribute of the output.
  • table_schema is used as the attribute name to avoid conflicts with the schema attribute of Pydantic's BaseModel.
Example
schema = (
    GetTableSchema(
        database="MY_DB",
        schema_="MY_SCHEMA",
        warehouse="MY_WH",
        user="gid.account@abc.com",
        password="super-secret-password",
        role="APPLICATION.SNOWFLAKE.ADMIN",
        table="MY_TABLE",
    )
    .execute()
    .table_schema
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The Snowflake table name"
)

Output #

Output class for GetTableSchema

table_schema class-attribute instance-attribute #

table_schema: StructType = Field(
    default=...,
    serialization_alias="schema",
    description="The Spark Schema",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = f"SELECT * FROM {self.table} LIMIT 1"  # nosec B608: hardcoded_sql_expressions
    df = Query(**self.get_options(), query=query).execute().df
    self.output.table_schema = df.schema

koheesio.integrations.spark.snowflake.GrantPrivilegesOnFullyQualifiedObject #

Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name

This class is a subclass of GrantPrivilegesOnObject and is used to grant privileges on a fully qualified object. The advantage of using this class is that it sets the object name to be fully qualified, i.e. database.schema.object_name.

Meaning, you can set the database, schema and object separately and the object name will be set to be fully qualified, i.e. database.schema.object_name.

Example
GrantPrivilegesOnFullyQualifiedObject(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    ...
    object="MY_TABLE",
    type="TABLE",
    ...
)

In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE. If you were to use GrantPrivilegesOnObject instead, you would have to set the object name to be fully qualified yourself.

set_object_name #

set_object_name() -> (
    "GrantPrivilegesOnFullyQualifiedObject"
)

Set the object name to be fully qualified, i.e. database.schema.object_name

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def set_object_name(self) -> "GrantPrivilegesOnFullyQualifiedObject":
    """Set the object name to be fully qualified, i.e. database.schema.object_name"""
    # database, schema, obj_name
    db = self.database
    schema = self.model_dump()["sfSchema"]  # since "schema" is a reserved name
    obj_name = self.object

    self.object = f"{db}.{schema}.{obj_name}"

    return self

koheesio.integrations.spark.snowflake.GrantPrivilegesOnObject #

A wrapper on Snowflake GRANT privileges

With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object

See Also

https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html

Parameters:

Name Type Description Default
account str

Snowflake Account Name.

required
warehouse str

The name of the warehouse. Alias for sfWarehouse

required
user str

The username. Alias for sfUser

required
password SecretStr

The password. Alias for sfPassword

required
role str

The role name

required
object str

The name of the object to grant privileges on

required
type str

The type of object to grant privileges on, e.g. TABLE, VIEW

required
privileges Union[conlist(str, min_length=1), str]

The Privilege/Permission or list of Privileges/Permissions to grant on the given object.

required
roles Union[conlist(str, min_length=1), str]

The Role or list of Roles to grant the privileges to

required
Example
GrantPermissionsOnTable(
    object="MY_TABLE",
    type="TABLE",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    permissions=["SELECT", "INSERT"],
).execute()

In this example, the APPLICATION.SNOWFLAKE.ADMIN role will be granted SELECT and INSERT privileges on the MY_TABLE table using the MY_WH warehouse.

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    description="The name of the object to grant privileges on",
)

privileges class-attribute instance-attribute #

privileges: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="permissions",
    description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)

query class-attribute instance-attribute #

query: str = (
    "GRANT {privileges} ON {type} {object} TO ROLE {role}"
)

roles class-attribute instance-attribute #

roles: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="role",
    validation_alias="roles",
    description="The Role or list of Roles to grant the privileges to",
)

type class-attribute instance-attribute #

type: str = Field(
    default=...,
    description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)

Output #

Output class for GrantPrivilegesOnObject

query class-attribute instance-attribute #

query: conlist(str, min_length=1) = Field(
    default=...,
    description="Query that was executed to grant privileges",
    validate_default=False,
)

execute #

execute() -> None
Source code in src/koheesio/integrations/snowflake/__init__.py
def execute(self) -> None:
    self.output.query = []
    roles = self.roles

    for role in roles:
        query = self.get_query(role)
        self.output.query.append(query)

        # Create a new instance of SnowflakeRunQueryPython with the current query
        instance = SnowflakeRunQueryPython.from_step(self, query=query)
        instance.execute()
        print(f"{instance.output = }")
        self.output.results.extend(instance.output.results)

get_query #

get_query(role: str) -> str

Build the GRANT query

Parameters:

Name Type Description Default
role str

The role name

required

Returns:

Name Type Description
query str

The Query that performs the grant

Source code in src/koheesio/integrations/snowflake/__init__.py
def get_query(self, role: str) -> str:
    """Build the GRANT query

    Parameters
    ----------
    role: str
        The role name

    Returns
    -------
    query : str
        The Query that performs the grant
    """
    query = self.query.format(
        privileges=",".join(self.privileges),
        type=self.type,
        object=self.object,
        role=role,
    )
    return query

set_roles_privileges #

set_roles_privileges(values: dict) -> dict

Coerce roles and privileges to be lists if they are not already.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="before")
def set_roles_privileges(cls, values: dict) -> dict:
    """Coerce roles and privileges to be lists if they are not already."""
    roles_value = values.get("roles") or values.get("role")
    privileges_value = values.get("privileges")

    if not (roles_value and privileges_value):
        raise ValueError("You have to specify roles AND privileges when using 'GrantPrivilegesOnObject'.")

    # coerce values to be lists
    values["roles"] = [roles_value] if isinstance(roles_value, str) else roles_value
    values["role"] = values["roles"][0]  # hack to keep the validator happy
    values["privileges"] = [privileges_value] if isinstance(privileges_value, str) else privileges_value

    return values

validate_object_and_object_type #

validate_object_and_object_type() -> (
    "GrantPrivilegesOnObject"
)

Validate that the object and type are set.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def validate_object_and_object_type(self) -> "GrantPrivilegesOnObject":
    """Validate that the object and type are set."""
    object_value = self.object
    if not object_value:
        raise ValueError("You must provide an `object`, this should be the name of the object. ")

    object_type = self.type
    if not object_type:
        raise ValueError(
            "You must provide a `type`, e.g. TABLE, VIEW, DATABASE. "
            "See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html"
        )

    return self

koheesio.integrations.spark.snowflake.GrantPrivilegesOnTable #

Grant Snowflake privileges to a set of roles on a table

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="table",
    description="The name of the Table to grant Privileges on. This should be just the name of the table; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'TABLE'

koheesio.integrations.spark.snowflake.GrantPrivilegesOnView #

Grant Snowflake privileges to a set of roles on a view

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="view",
    description="The name of the View to grant Privileges on. This should be just the name of the view; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'VIEW'

koheesio.integrations.spark.snowflake.Query #

Query data from Snowflake and return the result as a DataFrame

Example
Query(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="SELECT * FROM MY_TABLE",
).execute().df

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run"
)

get_options #

get_options(
    by_alias: bool = True, include: Set[str] = None
) -> Dict[str, Any]

add query to options

Source code in src/koheesio/integrations/spark/snowflake.py
def get_options(self, by_alias: bool = True, include: Set[str] = None) -> Dict[str, Any]:
    """add query to options"""
    options = super().get_options(by_alias)
    options["query"] = self.query
    return options

validate_query #

validate_query(query: str) -> str

Replace escape characters

Source code in src/koheesio/integrations/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    return query

koheesio.integrations.spark.snowflake.RunQuery #

Run a query on Snowflake that does not return a result, e.g. create table statement

This is a wrapper around 'net.snowflake.spark.snowflake.Utils.runQuery' on the JVM

Example
RunQuery(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="account",
    password="***",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="CREATE TABLE test (col1 string)",
).execute()

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run", alias="sql"
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> RunQuery.Output:
    # Executing the RunQuery without `host` option raises the following error:
    #   An error occurred while calling z:net.snowflake.spark.snowflake.Utils.runQuery.
    #   : java.util.NoSuchElementException: key not found: host
    options = self.get_options()
    options["host"] = self.url
    # noinspection PyProtectedMember
    self.spark._jvm.net.snowflake.spark.snowflake.Utils.runQuery(self.get_options(), self.query)

validate_query #

validate_query(query: str) -> str

Replace escape characters, strip whitespace, ensure it is not empty

Source code in src/koheesio/integrations/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters, strip whitespace, ensure it is not empty"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    if not query:
        raise ValueError("Query cannot be empty")
    return query

validate_spark_and_deprecate #

validate_spark_and_deprecate() -> RunQuery

If we do not have a spark session with a JVM, we can not use spark to run the query

Source code in src/koheesio/integrations/spark/snowflake.py
@model_validator(mode="after")
def validate_spark_and_deprecate(self) -> RunQuery:
    """If we do not have a spark session with a JVM, we can not use spark to run the query"""
    warn(
        "The RunQuery class is deprecated and will be removed in a future release. "
        "Please use the Python connector for Snowflake instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    if not hasattr(self.spark, "_jvm"):
        raise RuntimeError(
            "Your Spark session does not have a JVM and cannot run Snowflake query using RunQuery implementation. "
            "Please update your code to use python connector for Snowflake."
        )
    return self

koheesio.integrations.spark.snowflake.SnowflakeBaseModel #

BaseModel for setting up Snowflake Driver options.

Notes

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
database Optional[str]

The database to use for the session after connecting. Alias for sfDatabase.

None
sfSchema Optional[str]

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}

authenticator class-attribute instance-attribute #

authenticator: Optional[str] = Field(
    default=None,
    description="Authenticator for the Snowflake user",
    examples=["okta.com"],
)

database class-attribute instance-attribute #

database: Optional[str] = Field(
    default=None,
    alias="sfDatabase",
    description="The database to use for the session after connecting",
)

options class-attribute instance-attribute #

options: Optional[Dict[str, Any]] = Field(
    default={
        "sfCompress": "on",
        "continue_on_error": "off",
    },
    description="Extra options to pass to the Snowflake connector",
)

password class-attribute instance-attribute #

password: SecretStr = Field(
    default=...,
    alias="sfPassword",
    description="Password for the Snowflake user",
)

role class-attribute instance-attribute #

role: str = Field(
    default=...,
    alias="sfRole",
    description="The default security role to use for the session after connecting",
)

sfSchema class-attribute instance-attribute #

sfSchema: Optional[str] = Field(
    default=...,
    alias="schema",
    description="The schema to use for the session after connecting",
)

url class-attribute instance-attribute #

url: str = Field(
    default=...,
    alias="sfURL",
    description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
    examples=["example.snowflakecomputing.com"],
)

user class-attribute instance-attribute #

user: str = Field(
    default=...,
    alias="sfUser",
    description="Login name for the Snowflake user",
)

warehouse class-attribute instance-attribute #

warehouse: str = Field(
    default=...,
    alias="sfWarehouse",
    description="The default virtual warehouse to use for the session after connecting",
)

get_options #

get_options(
    by_alias: bool = True,
    include: Optional[Set[str]] = None,
) -> Dict[str, Any]

Get the sfOptions as a dictionary.

Note
  • Any parameters that are None are excluded from the output dictionary.
  • sfSchema and password are handled separately.
  • The values from both 'options' and 'params' (kwargs / extra params) are included as is.
  • Koheesio specific fields are excluded by default (i.e. name, description, format).

Parameters:

Name Type Description Default
by_alias bool

Whether to use the alias names or not. E.g. sfURL instead of url

True
include Optional[Set[str]]

Set of keys to include in the output dictionary. When None is provided, all fields will be returned. Note: be sure to include all the keys you need.

None
Source code in src/koheesio/integrations/snowflake/__init__.py
def get_options(self, by_alias: bool = True, include: Optional[Set[str]] = None) -> Dict[str, Any]:
    """Get the sfOptions as a dictionary.

    Note
    ----
    - Any parameters that are `None` are excluded from the output dictionary.
    - `sfSchema` and `password` are handled separately.
    - The values from both 'options' and 'params' (kwargs / extra params) are included as is.
    - Koheesio specific fields are excluded by default (i.e. `name`, `description`, `format`).

    Parameters
    ----------
    by_alias : bool, optional, default=True
        Whether to use the alias names or not. E.g. `sfURL` instead of `url`
    include : Optional[Set[str]], optional, default=None
        Set of keys to include in the output dictionary. When None is provided, all fields will be returned.
        Note: be sure to include all the keys you need.
    """
    exclude_set = {
        # Exclude koheesio specific fields
        "name",
        "description",
        # options and params are separately implemented
        "params",
        "options",
        # schema and password have to be handled separately
        "sfSchema",
        "password",
    } - (include or set())

    fields = self.model_dump(
        by_alias=by_alias,
        exclude_none=True,
        exclude=exclude_set,
    )

    # handle schema and password
    fields.update(
        {
            "sfSchema" if by_alias else "schema": self.sfSchema,
            "sfPassword" if by_alias else "password": self.password.get_secret_value(),
        }
    )

    # handle include
    if include:
        # user specified filter
        fields = {key: value for key, value in fields.items() if key in include}
    else:
        # default filter
        include = {"options", "params"}

    # handle options
    if "options" in include:
        options = fields.pop("options", self.options)
        fields.update(**options)

    # handle params
    if "params" in include:
        params = fields.pop("params", self.params)
        fields.update(**params)

    return {key: value for key, value in fields.items() if value}

koheesio.integrations.spark.snowflake.SnowflakeReader #

Wrapper around JdbcReader for Snowflake.

Example
sr = SnowflakeReader(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
)
df = sr.read()
Notes

driver class-attribute instance-attribute #

driver: Optional[str] = None

format class-attribute instance-attribute #

format: str = Field(
    default="snowflake",
    description="The format to use when writing to Snowflake",
)

execute #

execute() -> Output

Read from Snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SparkStep.Output:
    """Read from Snowflake"""
    super().execute()

koheesio.integrations.spark.snowflake.SnowflakeRunQueryPython #

Run a query on Snowflake using the Python connector

Example
RunQueryPython(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="account",
    password="***",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="CREATE TABLE test (col1 string)",
).execute()

account class-attribute instance-attribute #

account: Optional[str] = Field(
    default=None,
    description="Snowflake Account Name",
    alias="account",
)

conn property #

conn: Generator

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="The query to run",
    alias="sql",
    serialization_alias="query",
)

Output #

Output class for RunQueryPython

results class-attribute instance-attribute #

results: List = Field(
    default_factory=list,
    description="The results of the query",
)

execute #

execute() -> None

Execute the query

Source code in src/koheesio/integrations/snowflake/__init__.py
def execute(self) -> None:
    """Execute the query"""
    with self.conn as conn:
        cursors = conn.execute_string(
            self.get_query(),
        )
        for cursor in cursors:
            self.log.debug(f"Cursor executed: {cursor}")
            self.output.results.extend(cursor.fetchall())

get_options #

get_options(
    by_alias: bool = False,
    include: Optional[Set[str]] = None,
) -> Dict[str, Any]
Source code in src/koheesio/integrations/snowflake/__init__.py
def get_options(self, by_alias: bool = False, include: Optional[Set[str]] = None) -> Dict[str, Any]:
    if include is None:
        include = {
            "account",
            "url",
            "authenticator",
            "user",
            "role",
            "warehouse",
            "database",
            "schema",
            "password",
        }
    return super().get_options(by_alias=by_alias, include=include)

get_query #

get_query() -> str

allows to customize the query

Source code in src/koheesio/integrations/snowflake/__init__.py
def get_query(self) -> str:
    """allows to customize the query"""
    return self.query

validate_query #

validate_query(query: str) -> str

Replace escape characters, strip whitespace, ensure it is not empty

Source code in src/koheesio/integrations/snowflake/__init__.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters, strip whitespace, ensure it is not empty"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    if not query:
        raise ValueError("Query cannot be empty")
    return query

koheesio.integrations.spark.snowflake.SnowflakeSparkStep #

Expands the SnowflakeBaseModel so that it can be used as a SparkStep

koheesio.integrations.spark.snowflake.SnowflakeStep #

Expands the SnowflakeBaseModel so that it can be used as a Step

koheesio.integrations.spark.snowflake.SnowflakeTableStep #

Expands the SnowflakeStep, adding a 'table' parameter

full_name property #

full_name: str

Returns the fullname of snowflake table based on schema and database parameters.

Returns:

Type Description
str

Snowflake Complete table name (database.schema.table)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The name of the table"
)

koheesio.integrations.spark.snowflake.SnowflakeTransformation #

Adds Snowflake parameters to the Transformation class

koheesio.integrations.spark.snowflake.SnowflakeWriter #

Class for writing to Snowflake

See Also

format class-attribute instance-attribute #

format: str = Field(
    "snowflake",
    description="The format to use when writing to Snowflake",
)

insert_type class-attribute instance-attribute #

insert_type: Optional[BatchOutputMode] = Field(
    APPEND,
    alias="mode",
    description="The insertion type, append or overwrite",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="Target table name"
)

execute #

execute() -> Output

Write to Snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SnowflakeWriter.Output:
    """Write to Snowflake"""
    self.log.debug(f"writing to {self.table} with mode {self.insert_type}")
    self.df.write.format(self.format).options(**self.get_options()).option("dbtable", self.table).mode(
        self.insert_type
    ).save()

koheesio.integrations.spark.snowflake.SyncTableAndDataFrameSchema #

Sync the schema's of a Snowflake table and a DataFrame. This will add NULL columns for the columns that are not in both and perform type casts where needed.

The Snowflake table will take priority in case of type conflicts.

df class-attribute instance-attribute #

df: DataFrame = Field(
    default=..., description="The Spark DataFrame"
)

dry_run class-attribute instance-attribute #

dry_run: bool = Field(
    default=False,
    description="Only show schema differences, do not apply changes",
)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The table name"
)

Output #

Output class for SyncTableAndDataFrameSchema

new_df_schema class-attribute instance-attribute #

new_df_schema: StructType = Field(
    default=..., description="New DataFrame schema"
)

new_sf_schema class-attribute instance-attribute #

new_sf_schema: StructType = Field(
    default=..., description="New Snowflake schema"
)

original_df_schema class-attribute instance-attribute #

original_df_schema: StructType = Field(
    default=..., description="Original DataFrame schema"
)

original_sf_schema class-attribute instance-attribute #

original_sf_schema: StructType = Field(
    default=..., description="Original Snowflake schema"
)

sf_table_altered class-attribute instance-attribute #

sf_table_altered: bool = Field(
    default=False,
    description="Flag to indicate whether Snowflake schema has been altered",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    self.log.warning("Snowflake table will always take a priority in case of data type conflicts!")

    # spark side
    df_schema = self.df.schema
    self.output.original_df_schema = deepcopy(df_schema)  # using deepcopy to avoid storing in place changes
    df_cols = {c.name.lower() for c in df_schema}

    # snowflake side
    _options = {**self.get_options(), "table": self.table}
    sf_schema = GetTableSchema(**_options).execute().table_schema
    self.output.original_sf_schema = sf_schema
    sf_cols = {c.name.lower() for c in sf_schema}

    if self.dry_run:
        # Display differences between Spark DataFrame and Snowflake schemas
        # and provide dummy values that are expected as class outputs.
        _sf_diff = df_cols - sf_cols
        self.log.warning(f"Columns to be added to Snowflake table: {set(df_cols) - set(sf_cols)}")
        _df_diff = sf_cols - df_cols
        self.log.warning(f"Columns to be added to Spark DataFrame: {set(sf_cols) - set(df_cols)}")

        self.output.new_df_schema = t.StructType()
        self.output.new_sf_schema = t.StructType()
        self.output.df = self.df
        self.output.sf_table_altered = False

    else:
        # Add columns to SnowFlake table that exist in DataFrame
        for df_column in df_schema:
            if df_column.name.lower() not in sf_cols:
                AddColumn(
                    **self.get_options(),
                    table=self.table,
                    column=df_column.name,
                    type=df_column.dataType,
                ).execute()
                self.output.sf_table_altered = True

        if self.output.sf_table_altered:
            sf_schema = GetTableSchema(**self.get_options(), table=self.table).execute().table_schema
            sf_cols = {c.name.lower() for c in sf_schema}

        self.output.new_sf_schema = sf_schema

        # Add NULL columns to the DataFrame if they exist in SnowFlake but not in the df
        df = self.df
        for sf_col in self.output.original_sf_schema:
            sf_col_name = sf_col.name.lower()
            if sf_col_name not in df_cols:
                sf_col_type = sf_col.dataType
                df = df.withColumn(sf_col_name, f.lit(None).cast(sf_col_type))  # type: ignore

        # Put DataFrame columns in the same order as the Snowflake table
        df = df.select(*sf_cols)

        self.output.df = df
        self.output.new_df_schema = df.schema

koheesio.integrations.spark.snowflake.SynchronizeDeltaToSnowflakeTask #

Synchronize a Delta table to a Snowflake table

  • Overwrite - only in batch mode
  • Append - supports batch and streaming mode
  • Merge - only in streaming mode
Example
SynchronizeDeltaToSnowflakeTask(
    account="acme",
    url="acme.snowflakecomputing.com",
    user="admin",
    role="ADMIN",
    warehouse="SF_WAREHOUSE",
    database="SF_DATABASE",
    schema="SF_SCHEMA",
    source_table=DeltaTableStep(...),
    target_table="my_sf_table",
    key_columns=[
        "id",
    ],
    streaming=False,
).run()

account class-attribute instance-attribute #

account: Optional[str] = Field(
    default=None,
    description="The Snowflake account to connect to. If not provided, the `truncate_table` and `drop_table` methods will fail.",
)

checkpoint_location class-attribute instance-attribute #

checkpoint_location: Optional[str] = Field(
    default=None, description="Checkpoint location to use"
)

enable_deletion class-attribute instance-attribute #

enable_deletion: bool = Field(
    default=False,
    description="In case of merge synchronisation_mode add deletion statement in merge query.",
)

key_columns class-attribute instance-attribute #

key_columns: Optional[List[str]] = Field(
    default_factory=list,
    description="Key columns on which merge statements will be MERGE statement will be applied.",
)

non_key_columns property #

non_key_columns: List[str]

Columns of source table that aren't part of the (composite) primary key

persist_staging class-attribute instance-attribute #

persist_staging: bool = Field(
    default=False,
    description="In case of debugging, set `persist_staging` to True to retain the staging table for inspection after synchronization.",
)

reader property #

DeltaTable reader

Returns:

DeltaTableReader DeltaTableReader the will yield source delta table

schema_tracking_location class-attribute instance-attribute #

schema_tracking_location: Optional[str] = Field(
    default=None,
    description="Schema tracking location to use. Info: https://docs.delta.io/latest/delta-streaming.html#-schema-tracking",
)

source_table class-attribute instance-attribute #

source_table: DeltaTableStep = Field(
    default=...,
    description="Source delta table to synchronize",
)

staging_table property #

staging_table: str

Intermediate table on snowflake where staging results are stored

staging_table_name class-attribute instance-attribute #

staging_table_name: Optional[str] = Field(
    default=None,
    alias="staging_table",
    description="Optional snowflake staging name",
    validate_default=False,
)

streaming class-attribute instance-attribute #

streaming: bool = Field(
    default=False,
    description="Should synchronisation happen in streaming or in batch mode. Streaming is supported in 'APPEND' and 'MERGE' mode. Batch is supported in 'OVERWRITE' and 'APPEND' mode.",
)

synchronisation_mode class-attribute instance-attribute #

synchronisation_mode: BatchOutputMode = Field(
    default=MERGE,
    description="Determines if synchronisation will 'overwrite' any existing table, 'append' new rows or 'merge' with existing rows.",
)

target_table class-attribute instance-attribute #

target_table: str = Field(
    default=...,
    description="Target table in snowflake to synchronize to",
)

writer property #

Writer to persist to snowflake

Depending on configured options, this returns an SnowflakeWriter or ForEachBatchStreamWriter: - OVERWRITE/APPEND mode yields SnowflakeWriter - MERGE mode yields ForEachBatchStreamWriter

Returns:

Type Description
Union[ForEachBatchStreamWriter, SnowflakeWriter]

writer_ class-attribute instance-attribute #

drop_table #

drop_table(snowflake_table: str) -> None

Drop a given snowflake table

Source code in src/koheesio/integrations/spark/snowflake.py
def drop_table(self, snowflake_table: str) -> None:
    """Drop a given snowflake table"""
    self.log.warning(f"Dropping table {snowflake_table} from snowflake")
    drop_table_query = f"""DROP TABLE IF EXISTS {snowflake_table}"""  # nosec B608: hardcoded_sql_expressions
    query_executor = SnowflakeRunQueryPython(**self.get_options(), query=drop_table_query)
    query_executor.execute()

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SynchronizeDeltaToSnowflakeTask.Output:
    # extract
    df = self.extract()
    self.output.source_df = df

    # synchronize
    self.output.target_df = df
    self.load(df)
    if not self.persist_staging:
        # If it's a streaming job, await for termination before dropping staging table
        if self.streaming:
            self.writer.await_termination()  # type: ignore
        self.drop_table(self.staging_table)

extract #

extract() -> DataFrame

Extract source table

Source code in src/koheesio/integrations/spark/snowflake.py
def extract(self) -> DataFrame:
    """
    Extract source table
    """
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        if not self.source_table.is_cdf_active:
            raise RuntimeError(
                f"Source table {self.source_table.table_name} does not have CDF enabled. "
                f"Set TBLPROPERTIES ('delta.enableChangeDataFeed' = true) to enable. "
                f"Current properties = {self.source_table.get_persisted_properties()}"
            )

    df = self.reader.read()
    self.output.source_df = df
    return df

load #

load(df: DataFrame) -> DataFrame

Load source table into snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def load(self, df: DataFrame) -> DataFrame:
    """Load source table into snowflake"""
    if self.synchronisation_mode == BatchOutputMode.MERGE:
        self.log.info(f"Truncating staging table {self.staging_table}")  # type: ignore
        self.truncate_table(self.staging_table)
    self.writer.write(df)
    self.output.target_df = df
    return df

truncate_table #

truncate_table(snowflake_table: str) -> None

Truncate a given snowflake table

Source code in src/koheesio/integrations/spark/snowflake.py
def truncate_table(self, snowflake_table: str) -> None:
    """Truncate a given snowflake table"""
    truncate_query = f"""TRUNCATE TABLE IF EXISTS {snowflake_table}"""  # nosec B608: hardcoded_sql_expressions
    query_executor = SnowflakeRunQueryPython(
        **self.get_options(),
        query=truncate_query,
    )
    query_executor.execute()

koheesio.integrations.spark.snowflake.TableExists #

Check if the table exists in Snowflake by using INFORMATION_SCHEMA.

Example
k = TableExists(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
    table="table",
)

Output #

Output class for TableExists

exists class-attribute instance-attribute #

exists: bool = Field(
    default=...,
    description="Whether or not the table exists",
)

execute #

execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    query = (
        dedent(
            # Force upper case, due to case-sensitivity of where clause
            f"""
            SELECT *
            FROM INFORMATION_SCHEMA.TABLES
            WHERE TABLE_CATALOG     = '{self.database}'
              AND TABLE_SCHEMA      = '{self.sfSchema}'
              AND TABLE_TYPE        = 'BASE TABLE'
              AND upper(TABLE_NAME) = '{self.table.upper()}'
            """  # nosec B608: hardcoded_sql_expressions
        )
        .upper()
        .strip()
    )

    self.log.debug(f"Query that was executed to check if the table exists:\n{query}")

    df = Query(**self.get_options(), query=query).read()

    exists = df.count() > 0
    self.log.info(
        f"Table '{self.database}.{self.sfSchema}.{self.table}' {'exists' if exists else 'does not exist'}"
    )
    self.output.exists = exists

koheesio.integrations.spark.snowflake.TagSnowflakeQuery #

Provides Snowflake query tag pre-action that can be used to easily find queries through SF history search and further group them for debugging and cost tracking purposes.

Takes in query tag attributes as kwargs and additional Snowflake options dict that can optionally contain other set of pre-actions to be applied to a query, in that case existing pre-action aren't dropped, query tag pre-action will be added to them.

Passed Snowflake options dictionary is not modified in-place, instead anew dictionary containing updated pre-actions is returned.

Notes

See this article for explanation: https://select.dev/posts/snowflake-query-tags

Arbitrary tags can be applied, such as team, dataset names, business capability, etc.

Example
Using options parameter#

query_tag = (
    AddQueryTag(
        options={"preactions": "ALTER SESSION"},
        task_name="cleanse_task",
        pipeline_name="ingestion-pipeline",
        etl_date="2022-01-01",
        pipeline_execution_time="2022-01-01T00:00:00",
        task_execution_time="2022-01-01T01:00:00",
        environment="dev",
        trace_id="acd4f3f96045",
        span_id="546d2d66f6cb",
    )
    .execute()
    .options
)
In this example, the query tag pre-action will be added to the Snowflake options.

Using preactions parameter#

Instead of using options parameter, you can also use preactions parameter to provide existing preactions.

query_tag = AddQueryTag(
    preactions="ALTER SESSION"
    ...
).execute().options

The result will be the same as in the previous example.

Using get_options method#

The shorthand method get_options can be used to get the options dictionary.

query_tag = AddQueryTag(...).get_options()

options class-attribute instance-attribute #

options: Dict = Field(
    default_factory=dict,
    description="Additional Snowflake options, optionally containing additional preactions",
)

preactions class-attribute instance-attribute #

preactions: Optional[str] = Field(
    default="",
    description="Existing preactions from Snowflake options",
)

Output #

Output class for AddQueryTag

options class-attribute instance-attribute #

options: Dict = Field(
    default=...,
    description="Snowflake options dictionary with added query tag preaction",
)

execute #

execute() -> Output

Add query tag preaction to Snowflake options

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> Output:
    """Add query tag preaction to Snowflake options"""
    tag_json = json.dumps(self.extra_params, indent=4, sort_keys=True)
    tag_preaction = f"ALTER SESSION SET QUERY_TAG = '{tag_json}';"
    preactions = self.options.get("preactions", self.preactions)
    # update options with new preactions
    self.output.options = {**self.options, "preactions": f"{preactions}\n{tag_preaction}".strip()}

get_options #

get_options() -> Dict

shorthand method to get the options dictionary

Functionally equivalent to running execute().options

Returns:

Type Description
Dict

Snowflake options dictionary with added query tag preaction

Source code in src/koheesio/integrations/spark/snowflake.py
def get_options(self) -> Dict:
    """shorthand method to get the options dictionary

    Functionally equivalent to running `execute().options`

    Returns
    -------
    Dict
        Snowflake options dictionary with added query tag preaction
    """
    return self.execute().options

koheesio.integrations.spark.snowflake.map_spark_type #

map_spark_type(spark_type: DataType) -> str

Translates Spark DataFrame Schema type to SnowFlake type

Basic Types Snowflake Type
StringType STRING
NullType STRING
BooleanType BOOLEAN
Numeric Types Snowflake Type
LongType BIGINT
IntegerType INT
ShortType SMALLINT
DoubleType DOUBLE
FloatType FLOAT
NumericType FLOAT
ByteType BINARY
Date / Time Types Snowflake Type
DateType DATE
TimestampType TIMESTAMP
Advanced Types Snowflake Type
DecimalType DECIMAL
MapType VARIANT
ArrayType VARIANT
StructType VARIANT
References

Parameters:

Name Type Description Default
spark_type DataType

DataType taken out of the StructField

required

Returns:

Type Description
str

The Snowflake data type

Source code in src/koheesio/integrations/spark/snowflake.py
def map_spark_type(spark_type: t.DataType) -> str:
    """
    Translates Spark DataFrame Schema type to SnowFlake type

    | Basic Types       | Snowflake Type |
    |-------------------|----------------|
    | StringType        | STRING         |
    | NullType          | STRING         |
    | BooleanType       | BOOLEAN        |

    | Numeric Types     | Snowflake Type |
    |-------------------|----------------|
    | LongType          | BIGINT         |
    | IntegerType       | INT            |
    | ShortType         | SMALLINT       |
    | DoubleType        | DOUBLE         |
    | FloatType         | FLOAT          |
    | NumericType       | FLOAT          |
    | ByteType          | BINARY         |

    | Date / Time Types | Snowflake Type |
    |-------------------|----------------|
    | DateType          | DATE           |
    | TimestampType     | TIMESTAMP      |

    | Advanced Types    | Snowflake Type |
    |-------------------|----------------|
    | DecimalType       | DECIMAL        |
    | MapType           | VARIANT        |
    | ArrayType         | VARIANT        |
    | StructType        | VARIANT        |

    References
    ----------
    - Spark SQL DataTypes: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
    - Snowflake DataTypes: https://docs.snowflake.com/en/sql-reference/data-types.html

    Parameters
    ----------
    spark_type : pyspark.sql.types.DataType
        DataType taken out of the StructField

    Returns
    -------
    str
        The Snowflake data type
    """
    # StructField means that the entire Field was passed, we need to extract just the dataType before continuing
    if isinstance(spark_type, t.StructField):
        spark_type = spark_type.dataType

    # Check if the type is DayTimeIntervalType
    if isinstance(spark_type, t.DayTimeIntervalType):
        warn(
            "DayTimeIntervalType is being converted to STRING. "
            "Consider converting to a more supported date/time/timestamp type in Snowflake."
        )

    # fmt: off
    # noinspection PyUnresolvedReferences
    data_type_map = {
        # Basic Types
        t.StringType: "STRING",
        t.NullType: "STRING",
        t.BooleanType: "BOOLEAN",

        # Numeric Types
        t.LongType: "BIGINT",
        t.IntegerType: "INT",
        t.ShortType: "SMALLINT",
        t.DoubleType: "DOUBLE",
        t.FloatType: "FLOAT",
        t.NumericType: "FLOAT",
        t.ByteType: "BINARY",
        t.BinaryType: "VARBINARY",

        # Date / Time Types
        t.DateType: "DATE",
        t.TimestampType: "TIMESTAMP",
        t.DayTimeIntervalType: "STRING",

        # Advanced Types
        t.DecimalType:
            f"DECIMAL({spark_type.precision},{spark_type.scale})"  # pylint: disable=no-member
            if isinstance(spark_type, t.DecimalType) else "DECIMAL(38,0)",
        t.MapType: "VARIANT",
        t.ArrayType: "VARIANT",
        t.StructType: "VARIANT",
    }
    return data_type_map.get(type(spark_type), 'STRING')

koheesio.integrations.spark.snowflake.safe_import_snowflake_connector #

safe_import_snowflake_connector() -> Optional[ModuleType]

Validate that the Snowflake connector is installed

Returns:

Type Description
Optional[ModuleType]

The Snowflake connector module if it is installed, otherwise None

Source code in src/koheesio/integrations/snowflake/__init__.py
def safe_import_snowflake_connector() -> Optional[ModuleType]:
    """Validate that the Snowflake connector is installed

    Returns
    -------
    Optional[ModuleType]
        The Snowflake connector module if it is installed, otherwise None
    """
    is_accessable_sf_conf_dir = __check_access_snowflake_config_dir()

    if not is_accessable_sf_conf_dir and on_databricks():
        snowflake_home: str = tempfile.mkdtemp(prefix="snowflake_tmp_", dir="/tmp")  # nosec B108:ignore bandit check for CWE-377
        os.environ["SNOWFLAKE_HOME"] = snowflake_home
        warn(f"Getting error for snowflake config directory. Going to use temp directory `{snowflake_home}` instead.")
    elif not is_accessable_sf_conf_dir:
        raise PermissionError("Snowflake configuration directory is not accessible. Please check the permissions.")

    try:
        # Keep the import here as it is perfroming resolution of snowflake configuration directory
        from snowflake import connector as snowflake_connector

        return snowflake_connector
    except (ImportError, ModuleNotFoundError):
        warn(
            "You need to have the `snowflake-connector-python` package installed to use the Snowflake steps that are"
            "based around SnowflakeRunQueryPython. You can install this in Koheesio by adding `koheesio[snowflake]` to "
            "your package dependencies.",
            UserWarning,
        )
        return None