Initialization_Examples
Configurations¶
In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.
from spark_expectations.config.user_config import Constants as user_config
se_global_spark_Conf = {
user_config.se_notifications_enable_email: False, # (1)!
user_config.se_notifications_email_smtp_host: "mailhost.com", # (2)!
user_config.se_notifications_email_smtp_port: 25, # (3)!
user_config.se_notifications_email_from: "<sender_email_id>", # (4)!
user_config.se_notifications_email_to_other_mail_id: "<receiver_email_id's>", # (5)!
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", # (6)!
user_config.se_notifications_enable_slack: True, # (7)!
user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>", # (8)!
user_config.se_notifications_on_start: True, # (9)!
user_config.se_notifications_on_completion: True, # (10)!
user_config.se_notifications_on_fail: True, # (11)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (12)!
user_config.se_notifications_on_error_drop_threshold: 15, # (13)!
}
- The
user_config.se_notifications_enable_email
parameter, which controls whether notifications are sent via email, is set to false by default - The
user_config.se_notifications_email_smtp_host
parameter is set to "mailhost.com" by default and is used to specify the email SMTP domain host - The
user_config.se_notifications_email_smtp_port
parameter, which accepts a port number, is set to "25" by default - The
user_config.se_notifications_email_from
parameter is used to specify the email ID that will trigger the email notification - The
user_configse_notifications_email_to_other_mail_id
parameter accepts a list of recipient email IDs - The
user_config.se_notifications_email_subject
parameter captures the subject line of the email - The
user_config.se_notifications_enable_slack
parameter, which controls whether notifications are sent via slack, is set to false by default - The
user_config/se_notifications_slack_webhook_url
parameter accepts the webhook URL of a Slack channel for sending notifications - When
user_config.se_notifications_on_start
parameter set toTrue
enables notification on start of the spark-expectations, variable by default set toFalse
- When
user_config.se_notifications_on_completion
parameter set toTrue
enables notification on completion of spark-expectations framework, variable by default set toFalse
- When
user_config.se_notifications_on_fail
parameter set toTrue
enables notification on failure of spark-expectations data qulaity framework, variable by default set toTrue
- When
user_config.se_notifications_on_error_drop_exceeds_threshold_breach
parameter set toTrue
enables notification when error threshold reaches above the configured value - The
user_config.se_notifications_on_error_drop_threshold
parameter captures error drop threshold value
Spark Expectations Initialization¶
For all the below examples the below import and SparkExpectations class instantiation is mandatory
When store for sensitive details is Databricks secret scope,construct config dictionary for authentication of kafka and avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values. This dictionary can be placed in the init.py file of your project or declared as a global variable.
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.dbx_workspace_url : "https://workspace.cloud.databricks.com", # (3)!
user_config.dbx_secret_scope: "sole_common_prod", # (4)!
user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key", # (5)!
user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key", # (6)!
user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key", # (7)!
user_config.dbx_secret_token: "se_streaming_auth_secret_token_key", # (8)!
user_config.dbx_topic_name: "se_streaming_topic_name", # (9)!
}
- The
user_config.se_enable_streaming
parameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_type
used to define type of secret store and takes two values (databricks
,cererus
) by default will bedatabricks
- The
user_config.dbx_workspace_url
used to pass databricks workspace in the formathttps://<workspace_name>.cloud.databricks.com
- The
user_config.dbx_secret_scope
captures name of the secret scope - The
user_config.dbx_kafka_server_url
captures secret key for the kafka url - The
user_config.dbx_secret_token_url
captures secret key for the kafka authentication app url - The
user_config.dbx_secret_app_name
captures secret key for the kafka authentication app name - The
user_config.dbx_secret_token
captures secret key for the kafka authentication app secret token - The
user_config.dbx_topic_name
captures secret key for the kafka topic name
Similarly when sensitive store is cerberus:
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True, # (1)!
user_config.secret_type: "databricks", # (2)!
user_config.cbs_url : "https://<url>.cerberus.com", # (3)!
user_config.cbs_sdb_path: "cerberus_sdb_path", # (4)!
user_config.cbs_kafka_server_url: "se_streaming_server_url_secret_sdb_path", # (5)!
user_config.cbs_secret_token_url: "se_streaming_auth_secret_token_url_sdb_apth", # (6)!
user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path", # (7)!
user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path", # (8)!
user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path", # (9)!
}
- The
user_config.se_enable_streaming
parameter is used to control the enabling or disabling of Spark Expectations (SE) streaming functionality. When enabled, SE streaming stores the statistics of every batch run into Kafka. - The
user_config.secret_type
used to define type of secret store and takes two values (databricks
,cererus
) by default will bedatabricks
- The
user_config.cbs_url
used to pass cerberus url - The
user_config.cbs_sdb_path
captures cerberus secure data store path - The
user_config.cbs_kafka_server_url
captures path where kafka url stored in the cerberus sdb - The
user_config.cbs_secret_token_url
captures path where kafka authentication app stored in the cerberus sdb - The
user_config.cbs_secret_app_name
captures path where kafka authentication app name stored in the cerberus sdb - The
user_config.cbs_secret_token
captures path where kafka authentication app name secret token stored in the cerberus sdb - The
user_config.cbs_topic_name
captures path where kafka topic name stored in the cerberus sdb
from spark_expectations.core.expectations import SparkExpectations
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(product_id="your-products-id", stats_streaming_options=stats_streaming_config_dict) # (1)!
- Instantiate
SparkExpectations
class which has all the required functions for running data quality rules
Example 1¶
from spark_expectations.config.user_config import * # (7)!
@se.with_expectations( # (6)!
se.reader.get_rules_from_table( # (5)!
product_rules_table="pilot_nonpub.dq.dq_rules", # (1)!
table_name="pilot_nonpub.dq_employee.employee", # (2)!
dq_stats_table_name="pilot_nonpub.dq.dq_stats" # (3)!
),
write_to_table=True, # (4)!
write_to_temp_table=True, # (8)!
row_dq=True, # (9)!
agg_dq={ # (10)!
user_config.se_agg_dq: True, # (11)!
user_config.se_source_agg_dq: True, # (12)!
user_config.se_final_agg_dq: True, # (13)!
},
query_dq={ # (14)!
user_config.se_query_dq: True, # (15)!
user_config.se_source_query_dq: True, # (16)!
user_config.se_final_query_dq: True, # (17)!
user_config.se_target_table_view: "order", # (18)!
},
spark_conf=se_global_spark_Conf, # (19)!
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order") # (20)!
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product") # (20)!
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer") # (20)!
return _df_order # (21)!
- Provide the full table name of the table which contains the rules
- Provide the table name using which the
_error
table will be created, which contains all the failed records. Note if you are also wanting to write the data usingwrite_df
, then the table_name provided to both the functions should be same - Provide the full table name where the stats will be written into
- Use this argument to write the final data into the table. By default, it is False. This is optional, if you just want to run the data quality checks. A good example will be a staging table or temporary view.
- This functions reads the rules from the table and return them as a dict, which is an input to the
with_expectations
function - This is the decorator that helps us run the data quality rules. After running the rules the results will be written into
_stats
table anderror
table - import necessary configurable variables from
user_config
package for the specific functionality to configure in spark-expectations - Use this argument to write the input dataframe into the temp table, so that it breaks the spark plan and might speed up the job in cases of complex dataframe lineage
- The argument row_dq is optional and enables the conducting of row-based data quality checks. By default, this argument is set to True, however, if desired, these checks can be skipped by setting the argument to False.
- The
agg_dq
argument is a dictionary that is used to gather different settings and options for the purpose of configuring theagg_dq
- The argument
se_agg_dq
is utilized to activate the aggregate data quality check, and its default setting is True. - The
se_source_agg_dq
argument is optional and enables the conducting of aggregate-based data quality checks on the source data. By default, this argument is set to True, and this option depends on theagg_dq
value. If desired, these checks can be skipped by setting the source_agg_dq argument to False. - This optional argument
se_final_agg_dq
allows to perform agg-based data quality checks on final data, with the default setting beingTrue
, which depended onrow_agg
andagg_dq
. skip these checks by setting argument toFalse
- The
query_dq
argument is a dictionary that is used to gather different settings and options for the purpose of configuring thequery_dq
- The argument
se_query_dq
is utilized to activate the aggregate data quality check, and its default setting is True. - The
se_source_query_dq
argument is optional and enables the conducting of query-based data quality checks on the source data. By default, this argument is set to True, and this option depends on theagg_dq
value. If desired, these checks can be skipped by setting the source_agg_dq argument to False. - This optional argument
se_final_query_dq
allows to perform query_based data quality checks on final data, with the default setting beingTrue
, which depended onrow_agg
andagg_dq
. skip these checks by setting argument toFalse
- The parameter
se_target_table_view
can be provided with the name of a view that represents the target validated dataset for implementation ofquery_dq
on the clean dataset fromrow_dq
- The
spark_conf
parameter is utilized to gather all the configurations that are associated with notifications - View registration can be utilized when implementing
query_dq
expectations. - Returning a dataframe is mandatory for the
spark_expectations
to work, if we do not return a dataframe - then an exceptionm will be raised
Example 2¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats")
),
row_dq=True # (2)!
)
def build_new() -> DataFrame:
_df: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
)
return df
- Conduct only row-based data quality checks while skipping the aggregate data quality checks
- Disabled the aggregate data quality checks
Example 3¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=False, # (2)!
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: False,
}
)
def build_new() -> DataFrame:
_df: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
)
return df
- Perform only aggregate-based data quality checks while avoiding both row-based data quality checks and aggregate data quality checks on the validated dataset, since row validation has not taken place
- Disabled the row data quality checks
Example 4¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=True,
query_dq={ # (2)!
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order
- Conduct row-based and query-based data quality checks only on the source and target dataset, while skipping the aggregate data quality checks on the validated dataset
- Enabled the query data quality checks
Example 5¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=True,
agg_dq={ # (10)!
user_config.user_configse_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: False, # (2)!
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- Conduct row-based and aggregate-based data quality checks only on the source dataset, while skipping the aggregate data quality checks on the validated dataset
- Disabled the final aggregate data quality quality checks
Example 6¶
import os
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
spark_conf=se_global_spark_Conf, # (2)!
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- There are four types of notifications: notification_on_start, notification_on_completion, notification_on_fail and notification_on_error_threshold_breach.
Enable notifications for all four stages by setting the values to
True
- To provide the absolute file path for a configuration variable that holds information regarding notifications, use the
decalared global variable,
se_global_spark_Conf
Example 7¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=False,
agg_dq={
user_config.se_agg_dq: False,
user_config.se_source_agg_dq: False,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: False,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- Below combination of
row_dq, agg_dq, source_agg_dq, final_agg_dq, query_dq, source_query_dq and final_query_dq
skips the data quality checks because source_agg_dq depends on agg_dq and final_agg_dq depends on row_dq and agg_dq
Example 8¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats",
actions_if_failed=["drop", "ignore"] # (1)!
)
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- By default
action_if_failed
contains ["fail", "drop", "ignore"], but if we want to run only rules which has a particular action then we can pass them as list shown in the example
Example 9¶
@se.with_expectations( # (1)!
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats",
actions_if_failed=["drop", "ignore"] # (1)!
),
row_dq=True, # (2)!
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
}
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order
- The default options for the action_if_failed field are ["fail", "drop", or "ignore"], but you can specify which of these actions to run by providing a list of the desired actions in the example when selecting which data quality rules set to apply
- Data quality rules will only be applied if they have ["drop" or "ignore"] specified in the action_if_failed field
Example 10¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
spark_conf={"spark.files.maxPartitionBytes": "134217728"}, # (1)!
options={"mode": "overwrite", "partitionBy": "order_month",
"overwriteSchema": "true"}, # (2)!
options_error_table={"partition_by": "id"} # (3)!
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- Provide the optional
spark_conf
if needed, this is used while writing the data into thefinal
anderror
table along with notification related configurations - Provide the optional
options
if needed, this is used while writing the data into thefinal
table - Provide the optional
options_error_table
if needed, this is used while writing the data into theerror
table