Snowflake
Snowflake steps and tasks for Koheesio
Every class in this module is a subclass of Step
or Task
and is used to perform operations on Snowflake.
Notes
Every Step in this module is based on SnowflakeBaseModel. The following parameters are available for every Step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url
|
str
|
Hostname for the Snowflake account, e.g. |
required |
user
|
str
|
Login name for the Snowflake user.
Alias for |
required |
password
|
SecretStr
|
Password for the Snowflake user.
Alias for |
required |
database
|
str
|
The database to use for the session after connecting.
Alias for |
required |
sfSchema
|
str
|
The schema to use for the session after connecting.
Alias for |
required |
role
|
str
|
The default security role to use for the session after connecting.
Alias for |
required |
warehouse
|
str
|
The default virtual warehouse to use for the session after connecting.
Alias for |
required |
authenticator
|
Optional[str]
|
Authenticator for the Snowflake user. Example: "okta.com". |
None
|
options
|
Optional[Dict[str, Any]]
|
Extra options to pass to the Snowflake connector. |
{"sfCompress": "on", "continue_on_error": "off"}
|
format
|
str
|
The default |
"snowflake"
|
koheesio.integrations.spark.snowflake.AddColumn #
Add an empty column to a Snowflake table with given name and DataType
Example
account
class-attribute
instance-attribute
#
account: str = Field(
default=..., description="The Snowflake account"
)
column
class-attribute
instance-attribute
#
column: str = Field(
default=..., description="The name of the new column"
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=...,
description="The name of the Snowflake table",
)
type
class-attribute
instance-attribute
#
Output #
koheesio.integrations.spark.snowflake.CreateOrReplaceTableFromDataFrame #
Create (or Replace) a Snowflake table which has the same schema as a Spark DataFrame
Can be used as any Transformation. The DataFrame is however left unchanged, and only used for determining the schema of the Snowflake Table that is to be created (or replaced).
Example
CreateOrReplaceTableFromDataFrame(
database="MY_DB",
schema="MY_SCHEMA",
warehouse="MY_WH",
user="gid.account@abc.com",
password="super-secret-password",
role="APPLICATION.SNOWFLAKE.ADMIN",
table="MY_TABLE",
df=df,
).execute()
Or, as a Transformation:
account
class-attribute
instance-attribute
#
account: str = Field(
default=..., description="The Snowflake account"
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=...,
alias="table_name",
description="The name of the (new) table",
)
Output #
Output class for CreateOrReplaceTableFromDataFrame
execute #
execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.DbTableQuery #
Read table from Snowflake using the dbtable
option instead of query
Example
koheesio.integrations.spark.snowflake.GetTableSchema #
Get the schema from a Snowflake table as a Spark Schema
Notes
- This Step will execute a
SELECT * FROM <table> LIMIT 1
query to get the schema of the table. - The schema will be stored in the
table_schema
attribute of the output. table_schema
is used as the attribute name to avoid conflicts with theschema
attribute of Pydantic's BaseModel.
Example
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="The Snowflake table name"
)
Output #
Output class for GetTableSchema
table_schema
class-attribute
instance-attribute
#
table_schema: StructType = Field(
default=...,
serialization_alias="schema",
description="The Spark Schema",
)
koheesio.integrations.spark.snowflake.GrantPrivilegesOnFullyQualifiedObject #
Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name
This class is a subclass of GrantPrivilegesOnObject
and is used to grant privileges on a fully qualified object.
The advantage of using this class is that it sets the object name to be fully qualified, i.e.
database.schema.object_name
.
Meaning, you can set the database
, schema
and object
separately and the object name will be set to be fully
qualified, i.e. database.schema.object_name
.
Example
GrantPrivilegesOnFullyQualifiedObject(
database="MY_DB",
schema="MY_SCHEMA",
warehouse="MY_WH",
...
object="MY_TABLE",
type="TABLE",
...
)
In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE
.
If you were to use GrantPrivilegesOnObject
instead, you would have to set the object name to be fully qualified
yourself.
set_object_name #
Set the object name to be fully qualified, i.e. database.schema.object_name
Source code in src/koheesio/integrations/snowflake/__init__.py
koheesio.integrations.spark.snowflake.GrantPrivilegesOnObject #
A wrapper on Snowflake GRANT privileges
With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
account
|
str
|
Snowflake Account Name. |
required |
warehouse
|
str
|
The name of the warehouse. Alias for |
required |
user
|
str
|
The username. Alias for |
required |
password
|
SecretStr
|
The password. Alias for |
required |
role
|
str
|
The role name |
required |
object
|
str
|
The name of the object to grant privileges on |
required |
type
|
str
|
The type of object to grant privileges on, e.g. TABLE, VIEW |
required |
privileges
|
Union[conlist(str, min_length=1), str]
|
The Privilege/Permission or list of Privileges/Permissions to grant on the given object. |
required |
roles
|
Union[conlist(str, min_length=1), str]
|
The Role or list of Roles to grant the privileges to |
required |
Example
GrantPermissionsOnTable(
object="MY_TABLE",
type="TABLE",
warehouse="MY_WH",
user="gid.account@abc.com",
password=Secret("super-secret-password"),
role="APPLICATION.SNOWFLAKE.ADMIN",
permissions=["SELECT", "INSERT"],
).execute()
In this example, the APPLICATION.SNOWFLAKE.ADMIN
role will be granted SELECT
and INSERT
privileges on
the MY_TABLE
table using the MY_WH
warehouse.
object
class-attribute
instance-attribute
#
object: str = Field(
default=...,
description="The name of the object to grant privileges on",
)
privileges
class-attribute
instance-attribute
#
privileges: Union[conlist(str, min_length=1), str] = Field(
default=...,
alias="permissions",
description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)
query
class-attribute
instance-attribute
#
query: str = (
"GRANT {privileges} ON {type} {object} TO ROLE {role}"
)
roles
class-attribute
instance-attribute
#
roles: Union[conlist(str, min_length=1), str] = Field(
default=...,
alias="role",
validation_alias="roles",
description="The Role or list of Roles to grant the privileges to",
)
type
class-attribute
instance-attribute
#
type: str = Field(
default=...,
description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)
Output #
execute #
Source code in src/koheesio/integrations/snowflake/__init__.py
get_query #
Build the GRANT query
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role
|
str
|
The role name |
required |
Returns:
Name | Type | Description |
---|---|---|
query |
str
|
The Query that performs the grant |
Source code in src/koheesio/integrations/snowflake/__init__.py
set_roles_privileges #
Coerce roles and privileges to be lists if they are not already.
Source code in src/koheesio/integrations/snowflake/__init__.py
validate_object_and_object_type #
Validate that the object and type are set.
Source code in src/koheesio/integrations/snowflake/__init__.py
koheesio.integrations.spark.snowflake.GrantPrivilegesOnTable #
Grant Snowflake privileges to a set of roles on a table
koheesio.integrations.spark.snowflake.GrantPrivilegesOnView #
Grant Snowflake privileges to a set of roles on a view
koheesio.integrations.spark.snowflake.Query #
koheesio.integrations.spark.snowflake.RunQuery #
Run a query on Snowflake that does not return a result, e.g. create table statement
This is a wrapper around 'net.snowflake.spark.snowflake.Utils.runQuery' on the JVM
Example
query
class-attribute
instance-attribute
#
query: str = Field(
default=..., description="The query to run", alias="sql"
)
execute #
execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
validate_query #
Replace escape characters, strip whitespace, ensure it is not empty
Source code in src/koheesio/integrations/spark/snowflake.py
validate_spark_and_deprecate #
validate_spark_and_deprecate() -> RunQuery
If we do not have a spark session with a JVM, we can not use spark to run the query
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.SnowflakeBaseModel #
BaseModel for setting up Snowflake Driver options.
Notes
- Snowflake is supported natively in Databricks 4.2 and newer: https://docs.snowflake.com/en/user-guide/spark-connector-databricks
- Refer to Snowflake docs for the installation instructions for non-Databricks environments: https://docs.snowflake.com/en/user-guide/spark-connector-install
- Refer to Snowflake docs for connection options: https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url
|
str
|
Hostname for the Snowflake account, e.g. |
required |
user
|
str
|
Login name for the Snowflake user.
Alias for |
required |
password
|
SecretStr
|
Password for the Snowflake user.
Alias for |
required |
role
|
str
|
The default security role to use for the session after connecting.
Alias for |
required |
warehouse
|
str
|
The default virtual warehouse to use for the session after connecting.
Alias for |
required |
authenticator
|
Optional[str]
|
Authenticator for the Snowflake user. Example: "okta.com". |
None
|
database
|
Optional[str]
|
The database to use for the session after connecting.
Alias for |
None
|
sfSchema
|
Optional[str]
|
The schema to use for the session after connecting.
Alias for |
None
|
options
|
Optional[Dict[str, Any]]
|
Extra options to pass to the Snowflake connector. |
{"sfCompress": "on", "continue_on_error": "off"}
|
authenticator
class-attribute
instance-attribute
#
authenticator: Optional[str] = Field(
default=None,
description="Authenticator for the Snowflake user",
examples=["okta.com"],
)
database
class-attribute
instance-attribute
#
database: Optional[str] = Field(
default=None,
alias="sfDatabase",
description="The database to use for the session after connecting",
)
options
class-attribute
instance-attribute
#
options: Optional[Dict[str, Any]] = Field(
default={
"sfCompress": "on",
"continue_on_error": "off",
},
description="Extra options to pass to the Snowflake connector",
)
password
class-attribute
instance-attribute
#
password: SecretStr = Field(
default=...,
alias="sfPassword",
description="Password for the Snowflake user",
)
role
class-attribute
instance-attribute
#
role: str = Field(
default=...,
alias="sfRole",
description="The default security role to use for the session after connecting",
)
sfSchema
class-attribute
instance-attribute
#
sfSchema: Optional[str] = Field(
default=...,
alias="schema",
description="The schema to use for the session after connecting",
)
url
class-attribute
instance-attribute
#
url: str = Field(
default=...,
alias="sfURL",
description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
examples=["example.snowflakecomputing.com"],
)
user
class-attribute
instance-attribute
#
user: str = Field(
default=...,
alias="sfUser",
description="Login name for the Snowflake user",
)
warehouse
class-attribute
instance-attribute
#
warehouse: str = Field(
default=...,
alias="sfWarehouse",
description="The default virtual warehouse to use for the session after connecting",
)
get_options #
Get the sfOptions as a dictionary.
Note
- Any parameters that are
None
are excluded from the output dictionary. sfSchema
andpassword
are handled separately.- The values from both 'options' and 'params' (kwargs / extra params) are included as is.
- Koheesio specific fields are excluded by default (i.e.
name
,description
,format
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
by_alias
|
bool
|
Whether to use the alias names or not. E.g. |
True
|
include
|
Optional[Set[str]]
|
Set of keys to include in the output dictionary. When None is provided, all fields will be returned. Note: be sure to include all the keys you need. |
None
|
Source code in src/koheesio/integrations/snowflake/__init__.py
koheesio.integrations.spark.snowflake.SnowflakeReader #
Wrapper around JdbcReader for Snowflake.
Example
Notes
- Snowflake is supported natively in Databricks 4.2 and newer: https://docs.snowflake.com/en/user-guide/spark-connector-databricks
- Refer to Snowflake docs for the installation instructions for non-Databricks environments: https://docs.snowflake.com/en/user-guide/spark-connector-install
- Refer to Snowflake docs for connection options: https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector
koheesio.integrations.spark.snowflake.SnowflakeRunQueryPython #
Run a query on Snowflake using the Python connector
Example
account
class-attribute
instance-attribute
#
account: Optional[str] = Field(
default=None,
description="Snowflake Account Name",
alias="account",
)
query
class-attribute
instance-attribute
#
query: str = Field(
default=...,
description="The query to run",
alias="sql",
serialization_alias="query",
)
Output #
execute #
Execute the query
Source code in src/koheesio/integrations/snowflake/__init__.py
get_options #
Source code in src/koheesio/integrations/snowflake/__init__.py
validate_query #
Replace escape characters, strip whitespace, ensure it is not empty
Source code in src/koheesio/integrations/snowflake/__init__.py
koheesio.integrations.spark.snowflake.SnowflakeSparkStep #
Expands the SnowflakeBaseModel so that it can be used as a SparkStep
koheesio.integrations.spark.snowflake.SnowflakeStep #
Expands the SnowflakeBaseModel so that it can be used as a Step
koheesio.integrations.spark.snowflake.SnowflakeTableStep #
koheesio.integrations.spark.snowflake.SnowflakeTransformation #
Adds Snowflake parameters to the Transformation class
koheesio.integrations.spark.snowflake.SnowflakeWriter #
Class for writing to Snowflake
See Also
format
class-attribute
instance-attribute
#
format: str = Field(
"snowflake",
description="The format to use when writing to Snowflake",
)
insert_type
class-attribute
instance-attribute
#
insert_type: Optional[BatchOutputMode] = Field(
APPEND,
alias="mode",
description="The insertion type, append or overwrite",
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="Target table name"
)
koheesio.integrations.spark.snowflake.SyncTableAndDataFrameSchema #
Sync the schema's of a Snowflake table and a DataFrame. This will add NULL columns for the columns that are not in both and perform type casts where needed.
The Snowflake table will take priority in case of type conflicts.
df
class-attribute
instance-attribute
#
dry_run
class-attribute
instance-attribute
#
dry_run: bool = Field(
default=False,
description="Only show schema differences, do not apply changes",
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="The table name"
)
Output #
execute #
execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.SynchronizeDeltaToSnowflakeTask #
Synchronize a Delta table to a Snowflake table
- Overwrite - only in batch mode
- Append - supports batch and streaming mode
- Merge - only in streaming mode
Example
account
class-attribute
instance-attribute
#
account: Optional[str] = Field(
default=None,
description="The Snowflake account to connect to. If not provided, the `truncate_table` and `drop_table` methods will fail.",
)
checkpoint_location
class-attribute
instance-attribute
#
checkpoint_location: Optional[str] = Field(
default=None, description="Checkpoint location to use"
)
enable_deletion
class-attribute
instance-attribute
#
enable_deletion: bool = Field(
default=False,
description="In case of merge synchronisation_mode add deletion statement in merge query.",
)
key_columns
class-attribute
instance-attribute
#
key_columns: Optional[List[str]] = Field(
default_factory=list,
description="Key columns on which merge statements will be MERGE statement will be applied.",
)
non_key_columns
property
#
Columns of source table that aren't part of the (composite) primary key
persist_staging
class-attribute
instance-attribute
#
persist_staging: bool = Field(
default=False,
description="In case of debugging, set `persist_staging` to True to retain the staging table for inspection after synchronization.",
)
reader
property
#
reader: Union[DeltaTableReader, DeltaTableStreamReader]
DeltaTable reader
Returns:
DeltaTableReader DeltaTableReader the will yield source delta table
schema_tracking_location
class-attribute
instance-attribute
#
schema_tracking_location: Optional[str] = Field(
default=None,
description="Schema tracking location to use. Info: https://docs.delta.io/latest/delta-streaming.html#-schema-tracking",
)
source_table
class-attribute
instance-attribute
#
source_table: DeltaTableStep = Field(
default=...,
description="Source delta table to synchronize",
)
staging_table
property
#
staging_table: str
Intermediate table on snowflake where staging results are stored
staging_table_name
class-attribute
instance-attribute
#
staging_table_name: Optional[str] = Field(
default=None,
alias="staging_table",
description="Optional snowflake staging name",
validate_default=False,
)
streaming
class-attribute
instance-attribute
#
streaming: bool = Field(
default=False,
description="Should synchronisation happen in streaming or in batch mode. Streaming is supported in 'APPEND' and 'MERGE' mode. Batch is supported in 'OVERWRITE' and 'APPEND' mode.",
)
synchronisation_mode
class-attribute
instance-attribute
#
synchronisation_mode: BatchOutputMode = Field(
default=MERGE,
description="Determines if synchronisation will 'overwrite' any existing table, 'append' new rows or 'merge' with existing rows.",
)
target_table
class-attribute
instance-attribute
#
target_table: str = Field(
default=...,
description="Target table in snowflake to synchronize to",
)
writer
property
#
writer: Union[ForEachBatchStreamWriter, SnowflakeWriter]
Writer to persist to snowflake
Depending on configured options, this returns an SnowflakeWriter or ForEachBatchStreamWriter: - OVERWRITE/APPEND mode yields SnowflakeWriter - MERGE mode yields ForEachBatchStreamWriter
Returns:
Type | Description |
---|---|
Union[ForEachBatchStreamWriter, SnowflakeWriter]
|
|
writer_
class-attribute
instance-attribute
#
writer_: Optional[
Union[ForEachBatchStreamWriter, SnowflakeWriter]
] = None
drop_table #
drop_table(snowflake_table: str) -> None
Drop a given snowflake table
Source code in src/koheesio/integrations/spark/snowflake.py
execute #
execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
extract #
Extract source table
Source code in src/koheesio/integrations/spark/snowflake.py
load #
Load source table into snowflake
Source code in src/koheesio/integrations/spark/snowflake.py
truncate_table #
truncate_table(snowflake_table: str) -> None
Truncate a given snowflake table
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.TableExists #
Check if the table exists in Snowflake by using INFORMATION_SCHEMA.
Example
Output #
execute #
execute() -> Output
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.TagSnowflakeQuery #
Provides Snowflake query tag pre-action that can be used to easily find queries through SF history search and further group them for debugging and cost tracking purposes.
Takes in query tag attributes as kwargs and additional Snowflake options dict that can optionally contain other set of pre-actions to be applied to a query, in that case existing pre-action aren't dropped, query tag pre-action will be added to them.
Passed Snowflake options dictionary is not modified in-place, instead anew dictionary containing updated pre-actions is returned.
Notes
See this article for explanation: https://select.dev/posts/snowflake-query-tags
Arbitrary tags can be applied, such as team, dataset names, business capability, etc.
Example
Using options
parameter#
query_tag = (
AddQueryTag(
options={"preactions": "ALTER SESSION"},
task_name="cleanse_task",
pipeline_name="ingestion-pipeline",
etl_date="2022-01-01",
pipeline_execution_time="2022-01-01T00:00:00",
task_execution_time="2022-01-01T01:00:00",
environment="dev",
trace_id="acd4f3f96045",
span_id="546d2d66f6cb",
)
.execute()
.options
)
Using preactions
parameter#
Instead of using options
parameter, you can also use preactions
parameter to provide existing preactions.
The result will be the same as in the previous example.
Using get_options
method#
The shorthand method get_options
can be used to get the options
dictionary.
options
class-attribute
instance-attribute
#
options: Dict = Field(
default_factory=dict,
description="Additional Snowflake options, optionally containing additional preactions",
)
preactions
class-attribute
instance-attribute
#
preactions: Optional[str] = Field(
default="",
description="Existing preactions from Snowflake options",
)
Output #
execute #
execute() -> Output
Add query tag preaction to Snowflake options
Source code in src/koheesio/integrations/spark/snowflake.py
koheesio.integrations.spark.snowflake.map_spark_type #
map_spark_type(spark_type: DataType) -> str
Translates Spark DataFrame Schema type to SnowFlake type
Basic Types | Snowflake Type |
---|---|
StringType | STRING |
NullType | STRING |
BooleanType | BOOLEAN |
Numeric Types | Snowflake Type |
---|---|
LongType | BIGINT |
IntegerType | INT |
ShortType | SMALLINT |
DoubleType | DOUBLE |
FloatType | FLOAT |
NumericType | FLOAT |
ByteType | BINARY |
Date / Time Types | Snowflake Type |
---|---|
DateType | DATE |
TimestampType | TIMESTAMP |
Advanced Types | Snowflake Type |
---|---|
DecimalType | DECIMAL |
MapType | VARIANT |
ArrayType | VARIANT |
StructType | VARIANT |
References
- Spark SQL DataTypes: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
- Snowflake DataTypes: https://docs.snowflake.com/en/sql-reference/data-types.html
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark_type
|
DataType
|
DataType taken out of the StructField |
required |
Returns:
Type | Description |
---|---|
str
|
The Snowflake data type |
Source code in src/koheesio/integrations/spark/snowflake.py
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
|
koheesio.integrations.spark.snowflake.safe_import_snowflake_connector #
safe_import_snowflake_connector() -> Optional[ModuleType]
Validate that the Snowflake connector is installed
Returns:
Type | Description |
---|---|
Optional[ModuleType]
|
The Snowflake connector module if it is installed, otherwise None |