Understand Args
Configurations¶
In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.
from spark_expectations.config.user_config import Constants as user_config
se_user_conf = {
user_config.se_notifications_enable_email: False, # (1)!
user_config.se_notifications_email_smtp_host: "mailhost.com", # (2)!
user_config.se_notifications_email_smtp_port: 25, # (3)!
user_config.se_notifications_email_from: "<sender_email_id>", # (4)!
user_config.se_notifications_email_to_other_mail_id: "<receiver_email_id's>", # (5)!
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", # (6)!
user_config.se_notifications_enable_slack: True, # (7)!
user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>", # (8)!
user_config.se_notifications_on_start: True, # (9)!
user_config.se_notifications_on_completion: True, # (10)!
user_config.se_notifications_on_fail: True, # (11)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (12)!
user_config.se_notifications_on_error_drop_threshold: 15, # (13)!
}
- The
user_config.se_notifications_enable_email
parameter, which controls whether notifications are sent via email, is set to false by default - The
user_config.se_notifications_email_smtp_host
parameter is set to "mailhost.com" by default and is used to specify the email SMTP domain host - The
user_config.se_notifications_email_smtp_port
parameter, which accepts a port number, is set to "25" by default - The
user_config.se_notifications_email_from
parameter is used to specify the email ID that will trigger the email notification - The
user_configse_notifications_email_to_other_mail_id
parameter accepts a list of recipient email IDs - The
user_config.se_notifications_email_subject
parameter captures the subject line of the email - The
user_config.se_notifications_enable_slack
parameter, which controls whether notifications are sent via slack, is set to false by default - The
user_config/se_notifications_slack_webhook_url
parameter accepts the webhook URL of a Slack channel for sending notifications - When
user_config.se_notifications_on_start
parameter set toTrue
enables notification on start of the spark-expectations, variable by default set toFalse
- When
user_config.se_notifications_on_completion
parameter set toTrue
enables notification on completion of spark-expectations framework, variable by default set toFalse
- When
user_config.se_notifications_on_fail
parameter set toTrue
enables notification on failure of spark-expectations data qulaity framework, variable by default set toTrue
- When
user_config.se_notifications_on_error_drop_exceeds_threshold_breach
parameter set toTrue
enables notification when error threshold reaches above the configured value - The
user_config.se_notifications_on_error_drop_threshold
parameter captures error drop threshold value
Spark Expectations Initialization¶
For all the below examples the below import and SparkExpectations class instantiation is mandatory
When store for sensitive details is Databricks secret scope,construct config dictionary for authentication of kafka and avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values. This dictionary can be placed in the init.py file of your project or declared as a global variable.
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.dbx_workspace_url : "https://workspace.cloud.databricks.com", # (3)!
user_config.dbx_secret_scope: "sole_common_prod", # (4)!
user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key", # (5)!
user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key", # (6)!
user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key", # (7)!
user_config.dbx_secret_token: "se_streaming_auth_secret_token_key", # (8)!
user_config.dbx_topic_name: "se_streaming_topic_name", # (9)!
}
- The
user_config.se_enable_streaming
parameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_type
used to define type of secret store and takes two values (databricks
,cererus
) by default will bedatabricks
- The
user_config.dbx_workspace_url
used to pass databricks workspace in the formathttps://<workspace_name>.cloud.databricks.com
- The
user_config.dbx_secret_scope
captures name of the secret scope - The
user_config.dbx_kafka_server_url
captures secret key for the kafka url - The
user_config.dbx_secret_token_url
captures secret key for the kafka authentication app url - The
user_config.dbx_secret_app_name
captures secret key for the kafka authentication app name - The
user_config.dbx_secret_token
captures secret key for the kafka authentication app secret token - The
user_config.dbx_topic_name
captures secret key for the kafka topic name
Similarly when sensitive store is cerberus:
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.cbs_url : "https://<url>.cerberus.com", # (3)!
user_config.cbs_sdb_path: "cerberus_sdb_path", # (4)!
user_config.cbs_kafka_server_url: "se_streaming_server_url_secret_sdb_path", # (5)!
user_config.cbs_secret_token_url: "se_streaming_auth_secret_token_url_sdb_apth", # (6)!
user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path", # (7)!
user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path", # (8)!
user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path", # (9)!
}
- The
user_config.se_enable_streaming
parameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_type
used to define type of secret store and takes two values (databricks
,cererus
) by default will bedatabricks
- The
user_config.cbs_url
used to pass cerberus url - The
user_config.cbs_sdb_path
captures cerberus secure data store path - The
user_config.cbs_kafka_server_url
captures path where kafka url stored in the cerberus sdb - The
user_config.cbs_secret_token_url
captures path where kafka authentication app stored in the cerberus sdb - The
user_config.cbs_secret_app_name
captures path where kafka authentication app name stored in the cerberus sdb - The
user_config.cbs_secret_token
captures path where kafka authentication app name secret token stored in the cerberus sdb - The
user_config.cbs_topic_name
captures path where kafka topic name stored in the cerberus sdb
You can disable the streaming functionality by setting the user_config.se_enable_streaming
parameter to False
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: False, # (1)!
}
- The
user_config.se_enable_streaming
parameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka.
from spark_expectations.core.expectations import SparkExpectations
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(
product_id="your-products-id",
stats_streaming_options=stats_streaming_config_dict) # (1)!
- Instantiate
SparkExpectations
class which has all the required functions for running data quality rules
Example 1¶
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
writer = WrappedDataFrameWriter().mode("append").format("delta") # (1)!
se = SparkExpectations( # (10)!
product_id="your_product", # (11)!
rules_df=spark.table("dq_spark_local.dq_rules"), # (12)!
stats_table="dq_spark_local.dq_stats", # (13)!
stats_table_writer=writer, # (14)!
target_and_error_table_writer=writer, # (15)!
debugger=False, # (16)!
# stats_streaming_options={user_config.se_enable_streaming: False}, # (17)!
)
@se.with_expectations( # (2)!
write_to_table=True, # (3)!
write_to_temp_table=True, # (4)!
user_conf=se_user_conf, # (5)!
target_table_view="order", # (6)!
target_and_error_table_writer=writer, # (7)!
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer") # (8)!
return _df_order # (9)!
- The
WrappedDataFrameWriter
class is used to wrap theDataFrameWriter
class and add additional functionality to it - The
@se.with_expectations
decorator is used to run the data quality rules - The
write_to_table
parameter is used to write the final data into the table. By default, it is False. This is optional, if you just want to run the data quality checks. A good example will be a staging table or temporary view. - The
write_to_temp_table
parameter is used to write the input dataframe into the temp table, so that it breaks the spark plan and might speed up the job in cases of complex dataframe lineage - The
user_conf
parameter is utilized to gather all the configurations that are associated with notifications. There are four types of notifications: notification_on_start, notification_on_completion, notification_on_fail and notification_on_error_threshold_breach. Enable notifications for all four stages by setting the values toTrue
. By default, all four stages are set toFalse
- The
target_table_view
parameter is used to provide the name of a view that represents the target validated dataset for implementation ofquery_dq
on the clean dataset fromrow_dq
- The
target_and_error_table_writer
parameter is used to write the final data into the table. By default, it is False. This is optional, if you just want to run the data quality checks. A good example will be a staging table or temporary view. - View registration can be utilized when implementing
query_dq
expectations. - Returning a dataframe is mandatory for the
spark_expectations
to work, if we do not return a dataframe - then an exceptionm will be raised - Instantiate
SparkExpectations
class which has all the required functions for running data quality rules - The
product_id
parameter is used to specify the product ID of the data quality rules. This has to be a unique value - The
rules_df
parameter is used to specify the dataframe that contains the data quality rules - The
stats_table
parameter is used to specify the table name where the statistics will be written into - The
stats_table_writer
takes in the configuration that need to be used to write the stats table using pyspark - The
target_and_error_table_writer
takes in the configuration that need to be used to write the target and error table using pyspark - The
debugger
parameter is used to enable the debugger mode - The
stats_streaming_options
parameter is used to specify the configurations for streaming statistics into Kafka. To not use kafka, uncomment this.