Classes¶
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperator(secret_scope, parameters = {}, *args, **kwargs)
¶
This is used to run any sql query in snowflake environment
Example Usage in your brickflow task SnowflakeOperator( secret_scope=databricks_secrets_psc parameters= sf_load_parameters )
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values sf_load_parameters = {'query': comma_separeted_list_of_queries}
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
account = ctx.dbutils.secrets.get(self.secret_scope, 'account')
instance-attribute
¶
authenticator = None
instance-attribute
¶
cur = None
instance-attribute
¶
database = ctx.dbutils.secrets.get(self.secret_scope, 'database')
instance-attribute
¶
log = logging
instance-attribute
¶
parameters = parameters
instance-attribute
¶
password = ctx.dbutils.secrets.get(self.secret_scope, 'password')
instance-attribute
¶
query = parameters.get('query') or None
instance-attribute
¶
role = ctx.dbutils.secrets.get(self.secret_scope, 'role')
instance-attribute
¶
secret_scope = secret_scope
instance-attribute
¶
username = ctx.dbutils.secrets.get(self.secret_scope, 'username')
instance-attribute
¶
warehouse = ctx.dbutils.secrets.get(self.secret_scope, 'warehouse')
instance-attribute
¶
Functions¶
execute()
¶
logic that triggers the flow of events
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_cursor()
¶
logic to create a cursor for a successful snowflake connection to execute queries
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_snowflake_connection()
¶
logic to connect to snowflake instance with provided details and return a connection object
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
snowflake_query_exec(cur, database, query_string)
¶
Executes the snowflake query(ies) by replacing varibales with appropriate values
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorTimeOutException
¶
Bases: TimeoutError
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperator(secret_scope, parameters = {}, *args, **kwargs)
¶
Bases: SnowflakeOperator
This is used to copy data from unity catalpg table to a snowflake table
Example Usage in your brickflow task UcToSnowflakeOperator( secret_scope=databricks_secrets_psc parameters= uc_parameters )
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values uc_parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema', 'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1', 'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'", 'sf_cluster_keys':''}
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
account = ctx.dbutils.secrets.get(self.secret_scope, 'account')
instance-attribute
¶
authenticator = None
instance-attribute
¶
conn = None
instance-attribute
¶
cur = None
instance-attribute
¶
database = ctx.dbutils.secrets.get(self.secret_scope, 'database')
instance-attribute
¶
log = logging
instance-attribute
¶
parameters = parameters
instance-attribute
¶
password = ctx.dbutils.secrets.get(self.secret_scope, 'password')
instance-attribute
¶
role = ctx.dbutils.secrets.get(self.secret_scope, 'role')
instance-attribute
¶
secret_scope = secret_scope
instance-attribute
¶
sf_cluster_keys = None
instance-attribute
¶
sf_post_grants_sql = None
instance-attribute
¶
sf_post_steps_sql = None
instance-attribute
¶
sf_pre_steps_sql = None
instance-attribute
¶
username = ctx.dbutils.secrets.get(self.secret_scope, 'username')
instance-attribute
¶
warehouse = ctx.dbutils.secrets.get(self.secret_scope, 'warehouse')
instance-attribute
¶
write_mode = None
instance-attribute
¶
Functions¶
apply_grants()
¶
Function to apply grants after successful execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
execute()
¶
Main method for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
extract_source()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_postgrants()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_poststeps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_presteps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
load_snowflake(source_df, target_table)
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_compute()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_snowflake(query_input)
¶
Function to establish snowflake connection and submit commands for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
validate_input_params()
¶
Function to validate the input parameters
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorTimeOutException
¶
Bases: TimeoutError