Attributes¶
Classes¶
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperator(secret_scope, query_string, parameters={}, *args, **kwargs)
¶
This is used to run any sql query in snowflake environment
Example Usage in your brickflow task SnowflakeOperator( secret_scope=databricks_secrets_psc query_string=queries separated by semicolon )
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values query_string : required parameter with queries separeted by semicolon(;) parameters: optional parameter dictionary with key value pairs to substitute in the query
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
account = ctx.dbutils.secrets.get(self.secret_scope, 'account')
instance-attribute
¶
authenticator = ctx.dbutils.secrets.get(self.secret_scope, 'authenticator')
instance-attribute
¶
cur = None
instance-attribute
¶
database = ctx.dbutils.secrets.get(self.secret_scope, 'database')
instance-attribute
¶
log = log
instance-attribute
¶
parameters = parameters
instance-attribute
¶
password = ctx.dbutils.secrets.get(self.secret_scope, 'password')
instance-attribute
¶
query = query_string
instance-attribute
¶
role = ctx.dbutils.secrets.get(self.secret_scope, 'role')
instance-attribute
¶
secret_scope = secret_scope
instance-attribute
¶
username = ctx.dbutils.secrets.get(self.secret_scope, 'username')
instance-attribute
¶
warehouse = ctx.dbutils.secrets.get(self.secret_scope, 'warehouse')
instance-attribute
¶
Functions¶
execute()
¶
logic that triggers the flow of events
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_cursor()
¶
logic to create a cursor for a successful snowflake connection to execute queries
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_snowflake_connection()
¶
logic to connect to snowflake instance with provided details and return a connection object
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
snowflake_query_exec(cur, database, query_string)
¶
Executes the snowflake query(ies) by replacing varibales with appropriate values
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorTimeOutException
¶
Bases: TimeoutError
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperator(secret_scope, parameters={}, *args, **kwargs)
¶
Bases: SnowflakeOperator
This is used to copy data from unity catalpg table to a snowflake table
Example Usage in your brickflow task UcToSnowflakeOperator( secret_scope=databricks_secrets_psc parameters= uc_parameters )
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role Authenticator: optional additional authenticator needed for connection for ex: okta_connection_url
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema', 'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1', 'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'", 'sf_cluster_keys':''}
in the parameters dictionary we have mandatory keys as follows load_type(required): incremental/full dbx_catalog (required): name of the catalog in unity dbx_database (required): schema name within the catalog dbx_table (required): name of the object in the schema sf_database (optional): database name in snowflake sf_schema (required): snowflake schema in the database provided as part of scope sf_table (required): name of the table in snowflake to which we want to append or overwrite incremental_filter (optional): mandatory parameter for incremental load type to delete existing data in snowflake table dbx_data_filter (optional): parameter to filter databricks table if different from snowflake filter sf_cluster_keys (optional): list of keys to cluster the data in snowflake
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
dbx_data_filter = self.parameters.get('dbx_data_filter') or None
instance-attribute
¶
write_mode = None
instance-attribute
¶
self.authenticator = None try: import base64 from brickflow import ctx
self.username = ctx.dbutils.secrets.get(self.secret_scope, "username")
self.password = ctx.dbutils.secrets.get(self.secret_scope, "password")
self.account = ctx.dbutils.secrets.get(self.secret_scope, "account")
self.warehouse = ctx.dbutils.secrets.get(self.secret_scope, "warehouse")
self.database = ctx.dbutils.secrets.get(self.secret_scope, "database")
self.role = ctx.dbutils.secrets.get(self.secret_scope, "role")
except: raise ValueError( "Failed to fetch details from secret scope for username, password, account, warehouse, database, role !" )
Functions¶
apply_grants()
¶
Function to apply grants after successful execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
execute()
¶
Main method for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
extract_source()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_postgrants()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_poststeps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_presteps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
load_snowflake(source_df, target_table)
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_compute()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_snowflake(query_input)
¶
Function to establish snowflake connection and submit commands for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
validate_input_params()
¶
Function to validate the input parameters
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorTimeOutException
¶
Bases: TimeoutError