Understand Args
Configurations¶
In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.
from spark_expectations.config.user_config import Constants as user_config
se_user_conf = {
user_config.se_notifications_enable_email: False, # (1)!
user_config.se_notifications_enable_smtp_server_auth: False, # (2)!
user_config.se_notifications_enable_custom_email_body: False, # (3)
user_config.se_notifications_email_smtp_host: "mailhost.com", # (4)!
user_config.se_notifications_email_smtp_port: 25, # (5)!
user_config.se_notifications_smtp_password: "your_password",# (6)!
# user_config.se_notifications_smtp_creds_dict: {
# user_config.secret_type: "cerberus",
# user_config.cbs_url: "https://cerberus.example.com",
# user_config.cbs_sdb_path: "your_sdb_path",
# user_config.cbs_smtp_password: "your_smtp_password",
# }, # (7)!
user_config.se_notifications_email_from: "<sender_email_id>", # (8)!
user_config.se_notifications_email_to_other_mail_id: "<receiver_email_id's>", # (9)!
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", # (10)!
user_config.se_notifications_email_custom_body: "custom stats: 'product_id': {}", # (11)!
user_config.se_notifications_enable_slack: True, # (12)!
user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>", # (13)!
user_config.se_notifications_on_start: True, # (14)!
user_config.se_notifications_on_completion: True, # (15)!
user_config.se_notifications_on_fail: True, # (16)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (17)!
user_config.se_notifications_on_rules_action_if_failed_set_ignore: True, # (18)!
user_config.se_notifications_on_error_drop_threshold: 15, # (19)!
user_config.se_enable_error_table: True, # (20)!
user_config.enable_query_dq_detailed_result: True, # (21)!
user_config.enable_agg_dq_detailed_result: True, # (22)!
user_config.querydq_output_custom_table_name: "<catalog.schema.table-name>", #23
user_config.se_dq_rules_params: {
"env": "local",
"table": "product",
}, # (24)!
user_config.se_notifications_enable_templated_basic_email_body: True, # (25)!
user_config.se_notifications_default_basic_email_template: "", # (26)!
}
- The
user_config.se_notifications_enable_emailparameter, which controls whether notifications are sent via email, is set to false by default - The
user_config.se_notifications_enable_smtp_server_authoptional parameter, which controls whether SMTP server authentication is enabled, is set to false by default - The
user_config.se_notifications_enable_custom_email_bodyoptional parameter, which controls whether custom email body is enabled, is set to false by default - The
user_config.se_notifications_email_smtp_hostparameter is set to "mailhost.com" by default and is used to specify the email SMTP domain host - The
user_config.se_notifications_email_smtp_portparameter, which accepts a port number, is set to "25" by default - The
user_config.se_notifications_smtp_passwordparameter is used to specify the password for the SMTP server (if smtp_server requires authentication either this parameter oruser_config.se_notifications_smtp_creds_dictshould be set) - The
user_config.se_notifications_smtp_creds_dictparameter is used to specify the credentials for the SMTP server (if smtp_server requires authentication either this parameter oruser_config.se_notifications_smtp_passwordshould be set) - The
user_config.se_notifications_email_fromparameter is used to specify the email ID that will trigger the email notification - The
user_config.se_notifications_email_to_other_mail_idparameter accepts a list of recipient email IDs - The
user_config.se_notifications_email_subjectparameter captures the subject line of the email - The
user_config.se_notifications_email_custom_bodyoptional parameter, captures the custom email body, need to be compliant with certain syntax - The
user_config.se_notifications_enable_slackparameter, which controls whether notifications are sent via slack, is set to false by default - The
user_config/se_notifications_slack_webhook_urlparameter accepts the webhook URL of a Slack channel for sending notifications - When
user_config.se_notifications_on_startparameter set toTrueenables notification on start of the spark-expectations, variable by default set toFalse - When
user_config.se_notifications_on_completionparameter set toTrueenables notification on completion of spark-expectations framework, variable by default set toFalse - When
user_config.se_notifications_on_failparameter set toTrueenables notification on failure of spark-expectations data quality framework, variable by default set toTrue - When
user_config.se_notifications_on_error_drop_exceeds_threshold_breachparameter set toTrueenables notification when error threshold reaches above the configured value - When
user_config.se_notifications_on_rules_action_if_failed_set_ignoreparameter set toTrueenables notification when rules action is set to ignore if failed - The
user_config.se_notifications_on_error_drop_thresholdparameter captures error drop threshold value - The
user_config.se_enable_error_tableparameter, which controls whether error data to load into error table, is set to true by default - When
user_config.enable_query_dq_detailed_resultparameter set toTrue, enables the option to capture the query_dq detailed stats to detailed_stats table. By default set toFalse - When
user_config.enable_agg_dq_detailed_resultparameter set toTrue, enables the option to capture the agg_dq detailed stats to detailed_stats table. By default set toFalse - The
user_config.querydq_output_custom_table_nameparameter is used to specify the name of the custom query_dq output table which captures the output of the alias queries passed in the query dq expectation. Default is_custom_output - The
user_config.se_dq_rules_paramsparameter, which are required to dynamically update dq rules - The
user_config.se_notifications_enable_templated_basic_email_bodyoptional parameter is used to enable using a Jinja template for basic email notifications (notifying on job start, completion, failure, etc.) - The
user_config.se_notifications_default_basic_email_templateoptional parameter is used to specify the Jinja template used for basic email notifications. If the provided template is blank or this option is missing (while basic email templates are enabled) a default template will be used.
In case of SMTP server authentication, the password can be passed directly with the user config or set in a secure way like Cerberus or Databricks secret.
If it is preferred to use Cerberus for secure password storage, the user_config.se_notifications_smtp_creds_dict parameter can be used to specify the credentials for the SMTP server in the following way:
from spark_expectations.config.user_config import Constants as user_config
smtp_creds_dict = {
user_config.secret_type: "cerberus", # (1)!
user_config.cbs_url: "https://.example.com", # (2)!
user_config.cbs_sdb_path: "your_sdb_path", # (3)!
user_config.cbs_smtp_password: "your_smtp_password", # (4)!
}
- The
user_config.secret_typeused to define type of secret store and takes two values (databricks,cerberus) - The
user_config.cbs_urlused to pass Cerberus URL - The
user_config.cbs_sdb_pathcaptures Cerberus secure data store path - The
user_config.cbs_smtp_passwordcaptures key for smtp_password in the Cerberus sdb
Similarly, if it is preferred to use Databricks for secure password storage, the user_config.se_notifications_smtp_creds_dict parameter can be used to specify the credentials for the SMTP server in the following way:
from spark_expectations.config.user_config import Constants as user_config
smtp_creds_dict = {
user_config.secret_type: "databricks", # (1)!
user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com", # (2)!
user_config.dbx_secret_scope: "your_secret_scope", # (3)!
user_config.dbx_smtp_password: "your_password", # (4)!
}
- The
user_config.secret_typeused to define type of secret store and takes two values (databricks,cerberus) - The
user_config.dbx_workspace_urlused to pass Databricks workspace in the formathttps://<workspace_name>.cloud.databricks.com - The
user_config.dbx_secret_scopecaptures name of the secret scope - The
user_config.dbx_smtp_passwordcaptures secret key for smtp password in the Databricks secret scope
### Spark Expectations Initialization
For all the below examples the below import and SparkExpectations class instantiation is mandatory
When store for sensitive details is Databricks secret scope, construct config dictionary for authentication of Kafka and
avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values.
This dictionary can be placed in the __init__.py file of your project or declared as a global variable.
```python
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.dbx_workspace_url : "https://workspace.cloud.databricks.com", # (3)!
user_config.dbx_secret_scope: "sole_common_prod", # (4)!
user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key", # (5)!
user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key", # (6)!
user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key", # (7)!
user_config.dbx_secret_token: "se_streaming_auth_secret_token_key", # (8)!
user_config.dbx_topic_name: "se_streaming_topic_name", # (9)!
}
- The
user_config.se_enable_streamingparameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_typeused to define type of secret store and takes two values (databricks,cerberus) by default will bedatabricks - The
user_config.dbx_workspace_urlused to pass Databricks workspace in the formathttps://<workspace_name>.cloud.databricks.com - The
user_config.dbx_secret_scopecaptures name of the secret scope - The
user_config.dbx_kafka_server_urlcaptures secret key for the Kafka URL - The
user_config.dbx_secret_token_urlcaptures secret key for the Kafka authentication app URL - The
user_config.dbx_secret_app_namecaptures secret key for the Kafka authentication app name - The
user_config.dbx_secret_tokencaptures secret key for the Kafka authentication app secret token - The
user_config.dbx_topic_namecaptures secret key for the Kafka topic name
Similarly when sensitive store is Cerberus:
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.cbs_url : "https://<url>.cerberus.com", # (3)!
user_config.cbs_sdb_path: "cerberus_sdb_path", # (4)!
user_config.cbs_kafka_server_url: "se_streaming_server_url_secret_sdb_path", # (5)!
user_config.cbs_secret_token_url: "se_streaming_auth_secret_token_url_sdb_path", # (6)!
user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path", # (7)!
user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path", # (8)!
user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path", # (9)!
}
- The
user_config.se_enable_streamingparameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_typeused to define type of secret store and takes two values (databricks,cerberus) by default will bedatabricks - The
user_config.cbs_urlused to pass Cerberus URL - The
user_config.cbs_sdb_pathcaptures Cerberus secure data store path - The
user_config.cbs_kafka_server_urlcaptures path where Kafka URL stored in the Cerberus sdb - The
user_config.cbs_secret_token_urlcaptures path where Kafka authentication app stored in the Cerberus sdb - The
user_config.cbs_secret_app_namecaptures path where Kafka authentication app name stored in the Cerberus sdb - The
user_config.cbs_secret_tokencaptures path where Kafka authentication app name secret token stored in the Cerberus sdb - The
user_config.cbs_topic_namecaptures path where Kafka topic name stored in the Cerberus sdb
If you are running locally or not using Cerberus or Databricks and want to specify the streaming topic name and Kafka bootstrap server you can enable these custom options by setting user_config.se_streaming_stats_kafka_custom_config_enable to True and then providing the below parameters to specify the topic name and server. If user_config.se_streaming_stats_kafka_custom_config_enable is set to True but no options are specified, the defaults from spark_expectations/config/spark-expectations-default-config.yaml will be used.
Please note that the specified streaming topic and Kafka bootstrap server have to exist when running Spark Expectations (they will not be generated for you).
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.se_streaming_stats_kafka_custom_config_enable: True, # (2)!
user_config.se_streaming_stats_topic_name: "dq-sparkexpectations-stats", # (3)!
user_config.se_streaming_stats_kafka_bootstrap_server: "localhost:9092", # (4)!
}
- The
user_config.se_enable_streamingparameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.se_streaming_stats_kafka_custom_config_enableis an optional parameter that, when set to True, enables usinguser_config.se_streaming_stats_topic_nameto set the streaming topic name anduser_config.se_streaming_stats_kafka_bootstrap_serverto set the Kafka bootstrap server. If this parameter is set to True but no values are set for the topic name or the bootstrap server, defaults fromspark_expectations/config/spark-expectations-default-config.yamlwill be used. - The
user_config.se_streaming_stats_topic_nameparameter is used to set the streaming topic name when enabled with settinguser_config.se_streaming_stats_kafka_custom_config_enableto True. - The
user_config.se_streaming_stats_kafka_bootstrap_serverparameter is used to set the kafka bootstrap server when enabled with settinguser_config.se_streaming_stats_kafka_custom_config_enableto True.
You can disable the streaming functionality by setting the user_config.se_enable_streaming parameter to False
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: False, # (1)!
}
- The
user_config.se_enable_streamingparameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka.
from spark_expectations.core.expectations import SparkExpectations
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(
product_id="your-products-id",
stats_streaming_options=stats_streaming_config_dict) # (1)!
- Instantiate
SparkExpectationsclass which has all the required functions for running data quality rules
Example 1¶
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
writer = WrappedDataFrameWriter().mode("append").format("delta") # (1)!
se = SparkExpectations( # (10)!
product_id="your_product", # (11)!
rules_df=spark.table("dq_spark_local.dq_rules"), # (12)!
stats_table="dq_spark_local.dq_stats", # (13)!
stats_table_writer=writer, # (14)!
target_and_error_table_writer=writer, # (15)!
debugger=False, # (16)!
# stats_streaming_options={user_config.se_enable_streaming: False}, # (17)!
)
@se.with_expectations( # (2)!
write_to_table=True, # (3)!
write_to_temp_table=True, # (4)!
user_conf=se_user_conf, # (5)!
target_table_view="order", # (6)!
target_and_error_table_writer=writer, # (7)!
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer") # (8)!
return _df_order # (9)!
- The
WrappedDataFrameWriterclass is used to wrap theDataFrameWriterclass and add additional functionality to it - The
@se.with_expectationsdecorator is used to run the data quality rules - The
write_to_tableparameter is used to write the final data into the table. By default, it is False. This is optional, if you just want to run the data quality checks. A good example will be a staging table or temporary view. - The
write_to_temp_tableparameter is used to write the input dataframe into the temp table, so that it breaks the spark plan and might speed up the job in cases of complex dataframe lineage - The
user_confparameter is utilized to gather all the configurations that are associated with notifications. There are four types of notifications: notification_on_start, notification_on_completion, notification_on_fail and notification_on_error_threshold_breach. Enable notifications for all four stages by setting the values toTrue. By default, all four stages are set toFalse - The
target_table_viewparameter is used to provide the name of a view that represents the target validated dataset for implementation ofquery_dqon the clean dataset fromrow_dq - The
target_and_error_table_writerparameter is used to write the final data into the table. By default, it is False. This is optional, if you just want to run the data quality checks. A good example will be a staging table or temporary view. - View registration can be utilized when implementing
query_dqexpectations. - Returning a dataframe is mandatory for the
spark_expectationsto work, if we do not return a dataframe - then an exception will be raised - Instantiate
SparkExpectationsclass which has all the required functions for running data quality rules - The
product_idparameter is used to specify the product ID of the data quality rules. This has to be a unique value - The
rules_dfparameter is used to specify the dataframe that contains the data quality rules - The
stats_tableparameter is used to specify the table name where the statistics will be written into - The
stats_table_writertakes in the configuration that need to be used to write the stats table using pyspark - The
target_and_error_table_writertakes in the configuration that need to be used to write the target and error table using pyspark - The
debuggerparameter is used to enable the debugger mode - The
stats_streaming_optionsparameter is used to specify the configurations for streaming statistics into Kafka. To not use Kafka, uncomment this.