Snowflake
Snowflake steps and tasks for Koheesio
Every class in this module is a subclass of Step
or Task
and is used to perform operations on Snowflake.
Notes
Every Step in this module is based on SnowflakeBaseModel. The following parameters are available for every Step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
Hostname for the Snowflake account, e.g. |
required |
user |
str
|
Login name for the Snowflake user.
Alias for |
required |
password |
SecretStr
|
Password for the Snowflake user.
Alias for |
required |
database |
str
|
The database to use for the session after connecting.
Alias for |
required |
sfSchema |
str
|
The schema to use for the session after connecting.
Alias for |
required |
role |
str
|
The default security role to use for the session after connecting.
Alias for |
required |
warehouse |
str
|
The default virtual warehouse to use for the session after connecting.
Alias for |
required |
authenticator |
Optional[str]
|
Authenticator for the Snowflake user. Example: "okta.com". |
None
|
options |
Optional[Dict[str, Any]]
|
Extra options to pass to the Snowflake connector. |
{"sfCompress": "on", "continue_on_error": "off"}
|
format |
str
|
The default |
"snowflake"
|
koheesio.spark.snowflake.AddColumn #
Add an empty column to a Snowflake table with given name and DataType
Example
koheesio.spark.snowflake.CreateOrReplaceTableFromDataFrame #
Create (or Replace) a Snowflake table which has the same schema as a Spark DataFrame
Can be used as any Transformation. The DataFrame is however left unchanged, and only used for determining the schema of the Snowflake Table that is to be created (or replaced).
Example
CreateOrReplaceTableFromDataFrame(
database="MY_DB",
schema="MY_SCHEMA",
warehouse="MY_WH",
user="gid.account@nike.com",
password="super-secret-password",
role="APPLICATION.SNOWFLAKE.ADMIN",
table="MY_TABLE",
df=df,
).execute()
Or, as a Transformation:
table
class-attribute
instance-attribute
#
table: str = Field(
default=...,
alias="table_name",
description="The name of the (new) table",
)
Output #
Output class for CreateOrReplaceTableFromDataFrame
execute #
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.DbTableQuery #
Read table from Snowflake using the dbtable
option instead of query
Example
koheesio.spark.snowflake.GetTableSchema #
Get the schema from a Snowflake table as a Spark Schema
Notes
- This Step will execute a
SELECT * FROM <table> LIMIT 1
query to get the schema of the table. - The schema will be stored in the
table_schema
attribute of the output. table_schema
is used as the attribute name to avoid conflicts with theschema
attribute of Pydantic's BaseModel.
Example
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="The Snowflake table name"
)
Output #
Output class for GetTableSchema
table_schema
class-attribute
instance-attribute
#
table_schema: StructType = Field(
default=...,
serialization_alias="schema",
description="The Spark Schema",
)
koheesio.spark.snowflake.GrantPrivilegesOnFullyQualifiedObject #
Grant Snowflake privileges to a set of roles on a fully qualified object, i.e. database.schema.object_name
This class is a subclass of GrantPrivilegesOnObject
and is used to grant privileges on a fully qualified object.
The advantage of using this class is that it sets the object name to be fully qualified, i.e.
database.schema.object_name
.
Meaning, you can set the database
, schema
and object
separately and the object name will be set to be fully
qualified, i.e. database.schema.object_name
.
Example
GrantPrivilegesOnFullyQualifiedObject(
database="MY_DB",
schema="MY_SCHEMA",
warehouse="MY_WH",
...
object="MY_TABLE",
type="TABLE",
...
)
In this example, the object name will be set to be fully qualified, i.e. MY_DB.MY_SCHEMA.MY_TABLE
.
If you were to use GrantPrivilegesOnObject
instead, you would have to set the object name to be fully qualified
yourself.
set_object_name #
Set the object name to be fully qualified, i.e. database.schema.object_name
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.GrantPrivilegesOnObject #
A wrapper on Snowflake GRANT privileges
With this Step, you can grant Snowflake privileges to a set of roles on a table, a view, or an object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
warehouse |
str
|
The name of the warehouse. Alias for |
required |
user |
str
|
The username. Alias for |
required |
password |
SecretStr
|
The password. Alias for |
required |
role |
str
|
The role name |
required |
object |
str
|
The name of the object to grant privileges on |
required |
type |
str
|
The type of object to grant privileges on, e.g. TABLE, VIEW |
required |
privileges |
Union[conlist(str, min_length=1), str]
|
The Privilege/Permission or list of Privileges/Permissions to grant on the given object. |
required |
roles |
Union[conlist(str, min_length=1), str]
|
The Role or list of Roles to grant the privileges to |
required |
Example
GrantPermissionsOnTable(
object="MY_TABLE",
type="TABLE",
warehouse="MY_WH",
user="gid.account@nike.com",
password=Secret("super-secret-password"),
role="APPLICATION.SNOWFLAKE.ADMIN",
permissions=["SELECT", "INSERT"],
).execute()
In this example, the APPLICATION.SNOWFLAKE.ADMIN
role will be granted SELECT
and INSERT
privileges on
the MY_TABLE
table using the MY_WH
warehouse.
object
class-attribute
instance-attribute
#
object: str = Field(
default=...,
description="The name of the object to grant privileges on",
)
privileges
class-attribute
instance-attribute
#
privileges: Union[conlist(str, min_length=1), str] = Field(
default=...,
alias="permissions",
description="The Privilege/Permission or list of Privileges/Permissions to grant on the given object. See https://docs.snowflake.com/en/sql-reference/sql/grant-privilege.html",
)
roles
class-attribute
instance-attribute
#
roles: Union[conlist(str, min_length=1), str] = Field(
default=...,
alias="role",
validation_alias="roles",
description="The Role or list of Roles to grant the privileges to",
)
type
class-attribute
instance-attribute
#
type: str = Field(
default=...,
description="The type of object to grant privileges on, e.g. TABLE, VIEW",
)
Output #
execute #
get_query #
get_query(role: str)
set_roles_privileges #
Coerce roles and privileges to be lists if they are not already.
Source code in src/koheesio/spark/snowflake.py
validate_object_and_object_type #
Validate that the object and type are set.
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.GrantPrivilegesOnTable #
Grant Snowflake privileges to a set of roles on a table
koheesio.spark.snowflake.GrantPrivilegesOnView #
Grant Snowflake privileges to a set of roles on a view
koheesio.spark.snowflake.Query #
koheesio.spark.snowflake.RunQuery #
Run a query on Snowflake that does not return a result, e.g. create table statement
This is a wrapper around 'net.snowflake.spark.snowflake.Utils.runQuery' on the JVM
Example
query
class-attribute
instance-attribute
#
query: str = Field(
default=..., description="The query to run", alias="sql"
)
execute #
Source code in src/koheesio/spark/snowflake.py
get_options #
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.SnowflakeBaseModel #
BaseModel for setting up Snowflake Driver options.
Notes
- Snowflake is supported natively in Databricks 4.2 and newer: https://docs.snowflake.com/en/user-guide/spark-connector-databricks
- Refer to Snowflake docs for the installation instructions for non-Databricks environments: https://docs.snowflake.com/en/user-guide/spark-connector-install
- Refer to Snowflake docs for connection options: https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
Hostname for the Snowflake account, e.g. |
required |
user |
str
|
Login name for the Snowflake user.
Alias for |
required |
password |
SecretStr
|
Password for the Snowflake user.
Alias for |
required |
database |
str
|
The database to use for the session after connecting.
Alias for |
required |
sfSchema |
str
|
The schema to use for the session after connecting.
Alias for |
required |
role |
str
|
The default security role to use for the session after connecting.
Alias for |
required |
warehouse |
str
|
The default virtual warehouse to use for the session after connecting.
Alias for |
required |
authenticator |
Optional[str]
|
Authenticator for the Snowflake user. Example: "okta.com". |
None
|
options |
Optional[Dict[str, Any]]
|
Extra options to pass to the Snowflake connector. |
{"sfCompress": "on", "continue_on_error": "off"}
|
format |
str
|
The default |
"snowflake"
|
authenticator
class-attribute
instance-attribute
#
authenticator: Optional[str] = Field(
default=None,
description="Authenticator for the Snowflake user",
examples=["okta.com"],
)
database
class-attribute
instance-attribute
#
database: str = Field(
default=...,
alias="sfDatabase",
description="The database to use for the session after connecting",
)
format
class-attribute
instance-attribute
#
format: str = Field(
default="snowflake",
description="The default `snowflake` format can be used natively in Databricks, use `net.snowflake.spark.snowflake` in other environments and make sure to install required JARs.",
)
options
class-attribute
instance-attribute
#
options: Optional[Dict[str, Any]] = Field(
default={
"sfCompress": "on",
"continue_on_error": "off",
},
description="Extra options to pass to the Snowflake connector",
)
password
class-attribute
instance-attribute
#
password: SecretStr = Field(
default=...,
alias="sfPassword",
description="Password for the Snowflake user",
)
role
class-attribute
instance-attribute
#
role: str = Field(
default=...,
alias="sfRole",
description="The default security role to use for the session after connecting",
)
sfSchema
class-attribute
instance-attribute
#
sfSchema: str = Field(
default=...,
alias="schema",
description="The schema to use for the session after connecting",
)
url
class-attribute
instance-attribute
#
url: str = Field(
default=...,
alias="sfURL",
description="Hostname for the Snowflake account, e.g. <account>.snowflakecomputing.com",
examples=["example.snowflakecomputing.com"],
)
user
class-attribute
instance-attribute
#
user: str = Field(
default=...,
alias="sfUser",
description="Login name for the Snowflake user",
)
warehouse
class-attribute
instance-attribute
#
warehouse: str = Field(
default=...,
alias="sfWarehouse",
description="The default virtual warehouse to use for the session after connecting",
)
get_options #
Get the sfOptions as a dictionary.
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.SnowflakeReader #
Wrapper around JdbcReader for Snowflake.
Example
Notes
- Snowflake is supported natively in Databricks 4.2 and newer: https://docs.snowflake.com/en/user-guide/spark-connector-databricks
- Refer to Snowflake docs for the installation instructions for non-Databricks environments: https://docs.snowflake.com/en/user-guide/spark-connector-install
- Refer to Snowflake docs for connection options: https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector
koheesio.spark.snowflake.SnowflakeStep #
Expands the SnowflakeBaseModel so that it can be used as a Step
koheesio.spark.snowflake.SnowflakeTableStep #
koheesio.spark.snowflake.SnowflakeTransformation #
Adds Snowflake parameters to the Transformation class
koheesio.spark.snowflake.SnowflakeWriter #
Class for writing to Snowflake
See Also
insert_type
class-attribute
instance-attribute
#
insert_type: Optional[BatchOutputMode] = Field(
APPEND,
alias="mode",
description="The insertion type, append or overwrite",
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="Target table name"
)
execute #
Write to Snowflake
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.SyncTableAndDataFrameSchema #
Sync the schema's of a Snowflake table and a DataFrame. This will add NULL columns for the columns that are not in both and perform type casts where needed.
The Snowflake table will take priority in case of type conflicts.
df
class-attribute
instance-attribute
#
dry_run
class-attribute
instance-attribute
#
dry_run: Optional[bool] = Field(
default=False,
description="Only show schema differences, do not apply changes",
)
table
class-attribute
instance-attribute
#
table: str = Field(
default=..., description="The table name"
)
Output #
execute #
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.SynchronizeDeltaToSnowflakeTask #
Synchronize a Delta table to a Snowflake table
- Overwrite - only in batch mode
- Append - supports batch and streaming mode
- Merge - only in streaming mode
Example
checkpoint_location
class-attribute
instance-attribute
#
checkpoint_location: Optional[str] = Field(
default=None, description="Checkpoint location to use"
)
enable_deletion
class-attribute
instance-attribute
#
enable_deletion: Optional[bool] = Field(
default=False,
description="In case of merge synchronisation_mode add deletion statement in merge query.",
)
key_columns
class-attribute
instance-attribute
#
key_columns: Optional[List[str]] = Field(
default_factory=list,
description="Key columns on which merge statements will be MERGE statement will be applied.",
)
non_key_columns
property
#
Columns of source table that aren't part of the (composite) primary key
persist_staging
class-attribute
instance-attribute
#
persist_staging: Optional[bool] = Field(
default=False,
description="In case of debugging, set `persist_staging` to True to retain the staging table for inspection after synchronization.",
)
reader
property
#
DeltaTable reader
Returns:
DeltaTableReader the will yield source delta table
schema_tracking_location
class-attribute
instance-attribute
#
schema_tracking_location: Optional[str] = Field(
default=None,
description="Schema tracking location to use. Info: https://docs.delta.io/latest/delta-streaming.html#-schema-tracking",
)
source_table
class-attribute
instance-attribute
#
source_table: DeltaTableStep = Field(
default=...,
description="Source delta table to synchronize",
)
staging_table
property
#
Intermediate table on snowflake where staging results are stored
staging_table_name
class-attribute
instance-attribute
#
staging_table_name: Optional[str] = Field(
default=None,
alias="staging_table",
description="Optional snowflake staging name",
validate_default=False,
)
streaming
class-attribute
instance-attribute
#
streaming: Optional[bool] = Field(
default=False,
description="Should synchronisation happen in streaming or in batch mode. Streaming is supported in 'APPEND' and 'MERGE' mode. Batch is supported in 'OVERWRITE' and 'APPEND' mode.",
)
synchronisation_mode
class-attribute
instance-attribute
#
synchronisation_mode: BatchOutputMode = Field(
default=MERGE,
description="Determines if synchronisation will 'overwrite' any existing table, 'append' new rows or 'merge' with existing rows.",
)
target_table
class-attribute
instance-attribute
#
target_table: str = Field(
default=...,
description="Target table in snowflake to synchronize to",
)
writer
property
#
writer: Union[ForEachBatchStreamWriter, SnowflakeWriter]
Writer to persist to snowflake
Depending on configured options, this returns an SnowflakeWriter or ForEachBatchStreamWriter: - OVERWRITE/APPEND mode yields SnowflakeWriter - MERGE mode yields ForEachBatchStreamWriter
Returns:
Type | Description |
---|---|
Union[ForEachBatchStreamWriter, SnowflakeWriter]
|
|
writer_
class-attribute
instance-attribute
#
writer_: Optional[
Union[ForEachBatchStreamWriter, SnowflakeWriter]
] = None
drop_table #
Drop a given snowflake table
Source code in src/koheesio/spark/snowflake.py
execute #
Source code in src/koheesio/spark/snowflake.py
extract #
Extract source table
Source code in src/koheesio/spark/snowflake.py
load #
Load source table into snowflake
Source code in src/koheesio/spark/snowflake.py
run #
truncate_table #
Truncate a given snowflake table
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.TableExists #
Check if the table exists in Snowflake by using INFORMATION_SCHEMA.
Example
Output #
execute #
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.TagSnowflakeQuery #
Provides Snowflake query tag pre-action that can be used to easily find queries through SF history search and further group them for debugging and cost tracking purposes.
Takes in query tag attributes as kwargs and additional Snowflake options dict that can optionally contain other set of pre-actions to be applied to a query, in that case existing pre-action aren't dropped, query tag pre-action will be added to them.
Passed Snowflake options dictionary is not modified in-place, instead anew dictionary containing updated pre-actions is returned.
Notes
See this article for explanation: https://select.dev/posts/snowflake-query-tags
Arbitrary tags can be applied, such as team, dataset names, business capability, etc.
Example
query_tag = AddQueryTag(
options={"preactions": ...},
task_name="cleanse_task",
pipeline_name="ingestion-pipeline",
etl_date="2022-01-01",
pipeline_execution_time="2022-01-01T00:00:00",
task_execution_time="2022-01-01T01:00:00",
environment="dev",
trace_id="e0fdec43-a045-46e5-9705-acd4f3f96045",
span_id="cb89abea-1c12-471f-8b12-546d2d66f6cb",
),
).execute().options
options
class-attribute
instance-attribute
#
options: Dict = Field(
default_factory=dict,
description="Additional Snowflake options, optionally containing additional preactions",
)
Output #
execute #
Add query tag preaction to Snowflake options
Source code in src/koheesio/spark/snowflake.py
koheesio.spark.snowflake.map_spark_type #
Translates Spark DataFrame Schema type to SnowFlake type
Basic Types | Snowflake Type |
---|---|
StringType | STRING |
NullType | STRING |
BooleanType | BOOLEAN |
Numeric Types | Snowflake Type |
---|---|
LongType | BIGINT |
IntegerType | INT |
ShortType | SMALLINT |
DoubleType | DOUBLE |
FloatType | FLOAT |
NumericType | FLOAT |
ByteType | BINARY |
Date / Time Types | Snowflake Type |
---|---|
DateType | DATE |
TimestampType | TIMESTAMP |
Advanced Types | Snowflake Type |
---|---|
DecimalType | DECIMAL |
MapType | VARIANT |
ArrayType | VARIANT |
StructType | VARIANT |
References
- Spark SQL DataTypes: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
- Snowflake DataTypes: https://docs.snowflake.com/en/sql-reference/data-types.html
Parameters:
Name | Type | Description | Default |
---|---|---|---|
spark_type |
DataType
|
DataType taken out of the StructField |
required |
Returns:
Type | Description |
---|---|
str
|
The Snowflake data type |
Source code in src/koheesio/spark/snowflake.py
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
|