Attributes¶
Classes¶
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperator(secret_scope, query_string=None, sql_file=None, parameters={}, *args, **kwargs)
¶
This is used to run any sql query in snowflake environment
Example Usage in your brickflow task SnowflakeOperator( secret_scope=databricks_secrets_psc query_string=queries separated by semicolon )
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values query_string : Optional parameter with queries separeted by semicolon(;) sql_file : Optional parameter with file path (relative to brickflow project root) to .sql file parameters: optional parameter dictionary with key value pairs to substitute in the query
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
account = ctx.dbutils.secrets.get(self.secret_scope, 'account')
instance-attribute
¶
authenticator = ctx.dbutils.secrets.get(self.secret_scope, 'authenticator')
instance-attribute
¶
brickflow_root = get_bf_project_root()
instance-attribute
¶
cur = None
instance-attribute
¶
database = ctx.dbutils.secrets.get(self.secret_scope, 'database')
instance-attribute
¶
log = log
instance-attribute
¶
parameters = parameters
instance-attribute
¶
password = ctx.dbutils.secrets.get(self.secret_scope, 'password')
instance-attribute
¶
query = query_string
instance-attribute
¶
role = ctx.dbutils.secrets.get(self.secret_scope, 'role')
instance-attribute
¶
secret_scope = secret_scope
instance-attribute
¶
sql_file = sql_file
instance-attribute
¶
username = ctx.dbutils.secrets.get(self.secret_scope, 'username')
instance-attribute
¶
warehouse = ctx.dbutils.secrets.get(self.secret_scope, 'warehouse')
instance-attribute
¶
Functions¶
execute()
¶
logic that triggers the flow of events
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_cursor()
¶
logic to create a cursor for a successful snowflake connection to execute queries
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_snowflake_connection()
¶
logic to connect to snowflake instance with provided details and return a connection object
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
read_sql_file()
¶
logic to read the sql file and return the query string
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
snowflake_query_exec(cur, database, query_string)
¶
Executes the snowflake query(ies) by replacing varibales with appropriate values
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.SnowflakeOperatorTimeOutException
¶
Bases: TimeoutError
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperator(secret_scope, parameters={}, write_mode=None, *args, **kwargs)
¶
Bases: SnowflakeOperator
This is used to copy data from unity catalpg table to a snowflake table
Example Usage in your brickflow task UcToSnowflakeOperator( secret_scope=databricks_secrets_psc, parameters= uc_parameters )
In above snippet secret_scope refers to databricks secrets secure service to store the snowflake credentials. Support for other stores will be added as a future enhancement
As databricks secrets is a key value store, code expects the secret scope to contain the below exact keys username : user id created for connecting to snowflake for ex: sample_user password : password information for about user for ex: P@$$word account : snowflake account information, not entire url for ex: sample_enterprise warehouse: warehouse/cluster information that user has access for ex: sample_warehouse database : default database that we want to connect for ex: sample_database role : role to which the user has write access for ex: sample_write_role Authenticator: optional additional authenticator needed for connection for ex: okta_connection_url
above code snippet expects the data as follows databricks_secrets_psc contains username, password, account, warehouse, database and role keys with snowflake values parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema', 'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1', 'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'", 'sf_cluster_keys':'' 'dbx_sql':'select * from sample_catalog.sample_schema.sf_operator_1 where dt='2023-10-22'}
in the parameters dictionary we have mandatory keys as follows load_type(required): incremental/full dbx_catalog (required): name of the catalog in unity dbx_database (required): schema name within the catalog dbx_table (required): name of the object in the schema sf_database (optional): database name in snowflake sf_schema (required): snowflake schema in the database provided as part of scope sf_table (required): name of the table in snowflake to which we want to append or overwrite incremental_filter (optional): mandatory parameter for incremental load type to delete existing data in snowflake table dbx_data_filter (optional): parameter to filter databricks table if different from snowflake filter sf_cluster_keys (optional): list of keys to cluster the data in snowflake dbx_sql (optional): sql query to extract data from unity catalog
One of dbx_sql or dbx_catalog, dbx_database, dbx_table should be provided If custom sql is mentioned in db_sql, for incremental process make sure to include to write_mode or adjust incremental filter in the Operator to align with custom sql if not, there could be duplicates in Snowflake table
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
Attributes¶
dbx_data_filter = self.parameters.get('dbx_data_filter') or None
instance-attribute
¶
dbx_sql = self.parameters.get('dbx_sql') or None
instance-attribute
¶
parameters = parameters
instance-attribute
¶
write_mode = write_mode
instance-attribute
¶
self.authenticator = None try: import base64 from brickflow import ctx
self.username = ctx.dbutils.secrets.get(self.secret_scope, "username")
self.password = ctx.dbutils.secrets.get(self.secret_scope, "password")
self.account = ctx.dbutils.secrets.get(self.secret_scope, "account")
self.warehouse = ctx.dbutils.secrets.get(self.secret_scope, "warehouse")
self.database = ctx.dbutils.secrets.get(self.secret_scope, "database")
self.role = ctx.dbutils.secrets.get(self.secret_scope, "role")
except: raise ValueError( "Failed to fetch details from secret scope for username, password, account, warehouse, database, role !" )
Functions¶
apply_grants()
¶
Function to apply grants after successful execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
execute()
¶
Main method for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
extract_source()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_postgrants()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_poststeps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
get_sf_presteps()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
load_snowflake(source_df, target_table)
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_compute()
¶
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
submit_job_snowflake(query_input)
¶
Function to establish snowflake connection and submit commands for execution
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
validate_input_params()
¶
Function to validate the input parameters
Source code in brickflow_plugins/databricks/uc_to_snowflake_operator.py
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorException
¶
Bases: Exception
brickflow_plugins.databricks.uc_to_snowflake_operator.UcToSnowflakeOperatorTimeOutException
¶
Bases: TimeoutError