Skip to content

Snowflake

Snowflake steps and tasks for Koheesio

Every class in this module is a subclass of Step or BaseModel and is used to perform operations on Snowflake.

Notes

Every Step in this module is based on SnowflakeBaseModel. The following parameters are available for every Step.

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
database str

The database to use for the session after connecting. Alias for sfDatabase.

required
sfSchema str

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}
format str

The default snowflake format can be used natively in Databricks, use net.snowflake.spark.snowflake in other environments and make sure to install required JARs.

"snowflake"

koheesio.integrations.snowflake.GrantPrivilegesOnFullyQualifiedObject #

Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name

This class is a subclass of GrantPrivilegesOnObject and is used to grant privileges on a fully qualified object. The advantage of using this class is that it sets the object name to be fully qualified, i.e. database.schema.object_name.

Meaning, you can set the database, schema and object separately and the object name will be set to be fully qualified, i.e. database.schema.object_name.

Example
GrantPrivilegesOnFullyQualifiedObject(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    ...
    object="MY_TABLE",
    type="TABLE",
    ...
)

In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE. If you were to use GrantPrivilegesOnObject instead, you would have to set the object name to be fully qualified yourself.

set_object_name #

set_object_name() -> (
    "GrantPrivilegesOnFullyQualifiedObject"
)

Set the object name to be fully qualified, i.e. database.schema.object_name

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def set_object_name(self) -> "GrantPrivilegesOnFullyQualifiedObject":
    """Set the object name to be fully qualified, i.e. database.schema.object_name"""
    # database, schema, obj_name
    db = self.database
    schema = self.model_dump()["sfSchema"]  # since "schema" is a reserved name
    obj_name = self.object

    self.object = f"{db}.{schema}.{obj_name}"

    return self

koheesio.integrations.snowflake.GrantPrivilegesOnObject #

A wrapper on Snowflake GRANT privileges

With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object

See Also

https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html

Parameters:

Name Type Description Default
account str

Snowflake Account Name.

required
warehouse str

The name of the warehouse. Alias for sfWarehouse

required
user str

The username. Alias for sfUser

required
password SecretStr

The password. Alias for sfPassword

required
role str

The role name

required
object str

The name of the object to grant privileges on

required
type str

The type of object to grant privileges on, e.g. TABLE, VIEW

required
privileges Union[conlist(str, min_length=1), str]

The Privilege/Permission or list of Privileges/Permissions to grant on the given object.

required
roles Union[conlist(str, min_length=1), str]

The Role or list of Roles to grant the privileges to

required
Example
GrantPermissionsOnTable(
    object="MY_TABLE",
    type="TABLE",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    permissions=["SELECT", "INSERT"],
).execute()

In this example, the APPLICATION.SNOWFLAKE.ADMIN role will be granted SELECT and INSERT privileges on the MY_TABLE table using the MY_WH warehouse.

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    description="The name of the object to grant privileges on",
)

privileges class-attribute instance-attribute #

privileges: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="permissions",
    description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)

query class-attribute instance-attribute #

query: str = (
    "GRANT {privileges} ON {type} {object} TO ROLE {role}"
)

roles class-attribute instance-attribute #

roles: Union[conlist(str, min_length=1), str] = Field(
    default=...,
    alias="role",
    validation_alias="roles",
    description="The Role or list of Roles to grant the privileges to",
)

type class-attribute instance-attribute #

type: str = Field(
    default=...,
    description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)

Output #

Output class for GrantPrivilegesOnObject

query class-attribute instance-attribute #

query: conlist(str, min_length=1) = Field(
    default=...,
    description="Query that was executed to grant privileges",
    validate_default=False,
)

execute #

execute() -> None
Source code in src/koheesio/integrations/snowflake/__init__.py
def execute(self) -> None:
    self.output.query = []
    roles = self.roles

    for role in roles:
        query = self.get_query(role)
        self.output.query.append(query)

        # Create a new instance of SnowflakeRunQueryPython with the current query
        instance = SnowflakeRunQueryPython.from_step(self, query=query)
        instance.execute()
        print(f"{instance.output = }")
        self.output.results.extend(instance.output.results)

get_query #

get_query(role: str) -> str

Build the GRANT query

Parameters:

Name Type Description Default
role str

The role name

required

Returns:

Name Type Description
query str

The Query that performs the grant

Source code in src/koheesio/integrations/snowflake/__init__.py
def get_query(self, role: str) -> str:
    """Build the GRANT query

    Parameters
    ----------
    role: str
        The role name

    Returns
    -------
    query : str
        The Query that performs the grant
    """
    query = self.query.format(
        privileges=",".join(self.privileges),
        type=self.type,
        object=self.object,
        role=role,
    )
    return query

set_roles_privileges #

set_roles_privileges(values: dict) -> dict

Coerce roles and privileges to be lists if they are not already.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="before")
def set_roles_privileges(cls, values: dict) -> dict:
    """Coerce roles and privileges to be lists if they are not already."""
    roles_value = values.get("roles") or values.get("role")
    privileges_value = values.get("privileges")

    if not (roles_value and privileges_value):
        raise ValueError("You have to specify roles AND privileges when using 'GrantPrivilegesOnObject'.")

    # coerce values to be lists
    values["roles"] = [roles_value] if isinstance(roles_value, str) else roles_value
    values["role"] = values["roles"][0]  # hack to keep the validator happy
    values["privileges"] = [privileges_value] if isinstance(privileges_value, str) else privileges_value

    return values

validate_object_and_object_type #

validate_object_and_object_type() -> (
    "GrantPrivilegesOnObject"
)

Validate that the object and type are set.

Source code in src/koheesio/integrations/snowflake/__init__.py
@model_validator(mode="after")
def validate_object_and_object_type(self) -> "GrantPrivilegesOnObject":
    """Validate that the object and type are set."""
    object_value = self.object
    if not object_value:
        raise ValueError("You must provide an `object`, this should be the name of the object. ")

    object_type = self.type
    if not object_type:
        raise ValueError(
            "You must provide a `type`, e.g. TABLE, VIEW, DATABASE. "
            "See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html"
        )

    return self

koheesio.integrations.snowflake.GrantPrivilegesOnTable #

Grant Snowflake privileges to a set of roles on a table

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="table",
    description="The name of the Table to grant Privileges on. This should be just the name of the table; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'TABLE'

koheesio.integrations.snowflake.GrantPrivilegesOnView #

Grant Snowflake privileges to a set of roles on a view

object class-attribute instance-attribute #

object: str = Field(
    default=...,
    alias="view",
    description="The name of the View to grant Privileges on. This should be just the name of the view; so without Database and Schema, use sfDatabase/database and sfSchema/schema to set those instead.",
)

type class-attribute instance-attribute #

type: str = 'VIEW'

koheesio.integrations.snowflake.SnowflakeBaseModel #

BaseModel for setting up Snowflake Driver options.

Notes

Parameters:

Name Type Description Default
url str

Hostname for the Snowflake account, e.g. .snowflakecomputing.com. Alias for sfURL.

required
user str

Login name for the Snowflake user. Alias for sfUser.

required
password SecretStr

Password for the Snowflake user. Alias for sfPassword.

required
role str

The default security role to use for the session after connecting. Alias for sfRole.

required
warehouse str

The default virtual warehouse to use for the session after connecting. Alias for sfWarehouse.

required
authenticator Optional[str]

Authenticator for the Snowflake user. Example: "okta.com".

None
database Optional[str]

The database to use for the session after connecting. Alias for sfDatabase.

None
sfSchema Optional[str]

The schema to use for the session after connecting. Alias for schema ("schema" is a reserved name in Pydantic, so we use sfSchema as main name instead).

None
options Optional[Dict[str, Any]]

Extra options to pass to the Snowflake connector.

{"sfCompress": "on", "continue_on_error": "off"}

authenticator class-attribute instance-attribute #

authenticator: Optional[str] = Field(
    default=None,
    description="Authenticator for the Snowflake user",
    examples=["okta.com"],
)

database class-attribute instance-attribute #

database: Optional[str] = Field(
    default=None,
    alias="sfDatabase",
    description="The database to use for the session after connecting",
)

options class-attribute instance-attribute #

options: Optional[Dict[str, Any]] = Field(
    default={
        "sfCompress": "on",
        "continue_on_error": "off",
    },
    description="Extra options to pass to the Snowflake connector",
)

password class-attribute instance-attribute #

password: SecretStr = Field(
    default=...,
    alias="sfPassword",
    description="Password for the Snowflake user",
)

role class-attribute instance-attribute #

role: str = Field(
    default=...,
    alias="sfRole",
    description="The default security role to use for the session after connecting",
)

sfSchema class-attribute instance-attribute #

sfSchema: Optional[str] = Field(
    default=...,
    alias="schema",
    description="The schema to use for the session after connecting",
)

url class-attribute instance-attribute #

url: str = Field(
    default=...,
    alias="sfURL",
    description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
    examples=["example.snowflakecomputing.com"],
)

user class-attribute instance-attribute #

user: str = Field(
    default=...,
    alias="sfUser",
    description="Login name for the Snowflake user",
)

warehouse class-attribute instance-attribute #

warehouse: str = Field(
    default=...,
    alias="sfWarehouse",
    description="The default virtual warehouse to use for the session after connecting",
)

get_options #

get_options(
    by_alias: bool = True,
    include: Optional[Set[str]] = None,
) -> Dict[str, Any]

Get the sfOptions as a dictionary.

Note
  • Any parameters that are None are excluded from the output dictionary.
  • sfSchema and password are handled separately.
  • The values from both 'options' and 'params' (kwargs / extra params) are included as is.
  • Koheesio specific fields are excluded by default (i.e. name, description, format).

Parameters:

Name Type Description Default
by_alias bool

Whether to use the alias names or not. E.g. sfURL instead of url

True
include Optional[Set[str]]

Set of keys to include in the output dictionary. When None is provided, all fields will be returned. Note: be sure to include all the keys you need.

None
Source code in src/koheesio/integrations/snowflake/__init__.py
def get_options(self, by_alias: bool = True, include: Optional[Set[str]] = None) -> Dict[str, Any]:
    """Get the sfOptions as a dictionary.

    Note
    ----
    - Any parameters that are `None` are excluded from the output dictionary.
    - `sfSchema` and `password` are handled separately.
    - The values from both 'options' and 'params' (kwargs / extra params) are included as is.
    - Koheesio specific fields are excluded by default (i.e. `name`, `description`, `format`).

    Parameters
    ----------
    by_alias : bool, optional, default=True
        Whether to use the alias names or not. E.g. `sfURL` instead of `url`
    include : Optional[Set[str]], optional, default=None
        Set of keys to include in the output dictionary. When None is provided, all fields will be returned.
        Note: be sure to include all the keys you need.
    """
    exclude_set = {
        # Exclude koheesio specific fields
        "name",
        "description",
        # options and params are separately implemented
        "params",
        "options",
        # schema and password have to be handled separately
        "sfSchema",
        "password",
    } - (include or set())

    fields = self.model_dump(
        by_alias=by_alias,
        exclude_none=True,
        exclude=exclude_set,
    )

    # handle schema and password
    fields.update(
        {
            "sfSchema" if by_alias else "schema": self.sfSchema,
            "sfPassword" if by_alias else "password": self.password.get_secret_value(),
        }
    )

    # handle include
    if include:
        # user specified filter
        fields = {key: value for key, value in fields.items() if key in include}
    else:
        # default filter
        include = {"options", "params"}

    # handle options
    if "options" in include:
        options = fields.pop("options", self.options)
        fields.update(**options)

    # handle params
    if "params" in include:
        params = fields.pop("params", self.params)
        fields.update(**params)

    return {key: value for key, value in fields.items() if value}

koheesio.integrations.snowflake.SnowflakeRunQueryPython #

Run a query on Snowflake using the Python connector

Example
RunQueryPython(
    database="MY_DB",
    schema="MY_SCHEMA",
    warehouse="MY_WH",
    user="account",
    password="***",
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="CREATE TABLE test (col1 string)",
).execute()

account class-attribute instance-attribute #

account: Optional[str] = Field(
    default=None,
    description="Snowflake Account Name",
    alias="account",
)

conn property #

conn: Generator

query class-attribute instance-attribute #

query: str = Field(
    default=...,
    description="The query to run",
    alias="sql",
    serialization_alias="query",
)

Output #

Output class for RunQueryPython

results class-attribute instance-attribute #

results: List = Field(
    default_factory=list,
    description="The results of the query",
)

execute #

execute() -> None

Execute the query

Source code in src/koheesio/integrations/snowflake/__init__.py
def execute(self) -> None:
    """Execute the query"""
    with self.conn as conn:
        cursors = conn.execute_string(
            self.get_query(),
        )
        for cursor in cursors:
            self.log.debug(f"Cursor executed: {cursor}")
            self.output.results.extend(cursor.fetchall())

get_options #

get_options(
    by_alias: bool = False,
    include: Optional[Set[str]] = None,
) -> Dict[str, Any]
Source code in src/koheesio/integrations/snowflake/__init__.py
def get_options(self, by_alias: bool = False, include: Optional[Set[str]] = None) -> Dict[str, Any]:
    if include is None:
        include = {
            "account",
            "url",
            "authenticator",
            "user",
            "role",
            "warehouse",
            "database",
            "schema",
            "password",
        }
    return super().get_options(by_alias=by_alias, include=include)

get_query #

get_query() -> str

allows to customize the query

Source code in src/koheesio/integrations/snowflake/__init__.py
def get_query(self) -> str:
    """allows to customize the query"""
    return self.query

validate_query #

validate_query(query: str) -> str

Replace escape characters, strip whitespace, ensure it is not empty

Source code in src/koheesio/integrations/snowflake/__init__.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters, strip whitespace, ensure it is not empty"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    if not query:
        raise ValueError("Query cannot be empty")
    return query

koheesio.integrations.snowflake.SnowflakeStep #

Expands the SnowflakeBaseModel so that it can be used as a Step

koheesio.integrations.snowflake.SnowflakeTableStep #

Expands the SnowflakeStep, adding a 'table' parameter

full_name property #

full_name: str

Returns the fullname of snowflake table based on schema and database parameters.

Returns:

Type Description
str

Snowflake Complete table name (database.schema.table)

table class-attribute instance-attribute #

table: str = Field(
    default=..., description="The name of the table"
)

koheesio.integrations.snowflake.safe_import_snowflake_connector #

safe_import_snowflake_connector() -> Optional[ModuleType]

Validate that the Snowflake connector is installed

Returns:

Type Description
Optional[ModuleType]

The Snowflake connector module if it is installed, otherwise None

Source code in src/koheesio/integrations/snowflake/__init__.py
def safe_import_snowflake_connector() -> Optional[ModuleType]:
    """Validate that the Snowflake connector is installed

    Returns
    -------
    Optional[ModuleType]
        The Snowflake connector module if it is installed, otherwise None
    """
    is_accessable_sf_conf_dir = __check_access_snowflake_config_dir()

    if not is_accessable_sf_conf_dir and on_databricks():
        snowflake_home: str = tempfile.mkdtemp(prefix="snowflake_tmp_", dir="/tmp")  # nosec B108:ignore bandit check for CWE-377
        os.environ["SNOWFLAKE_HOME"] = snowflake_home
        warn(f"Getting error for snowflake config directory. Going to use temp directory `{snowflake_home}` instead.")
    elif not is_accessable_sf_conf_dir:
        raise PermissionError("Snowflake configuration directory is not accessible. Please check the permissions.")

    try:
        # Keep the import here as it is perfroming resolution of snowflake configuration directory
        from snowflake import connector as snowflake_connector

        return snowflake_connector
    except (ImportError, ModuleNotFoundError):
        warn(
            "You need to have the `snowflake-connector-python` package installed to use the Snowflake steps that are"
            "based around SnowflakeRunQueryPython. You can install this in Koheesio by adding `koheesio[snowflake]` to "
            "your package dependencies.",
            UserWarning,
        )
        return None