Initialization_Examples
Configurations¶
In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.
from spark_expectations.config.user_config import Constants as user_config
se_global_spark_Conf = {
user_config.se_notifications_enable_email: False,
user_config.se_notifications_email_smtp_host: "mailhost.com",
user_config.se_notifications_email_smtp_port: 25,
user_config.se_notifications_email_from: "<sender_email_id>",
user_config.se_notifications_email_to_other_mail_id: "<receiver_email_id's>",
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_enable_slack: True,
user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>",
user_config.se_notifications_on_start: True,
user_config.se_notifications_on_completion: True,
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
}
Spark Expectations Initialization¶
For all the below examples the below import and SparkExpectations class instantiation is mandatory
When store for sensitive details is Databricks secret scope,construct config dictionary for authentication of kafka and avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values. This dictionary can be placed in the init.py file of your project or declared as a global variable.
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True,
user_config.secret_type: "databricks",
user_config.dbx_workspace_url : "https://workspace.cloud.databricks.com",
user_config.dbx_secret_scope: "sole_common_prod",
user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key",
user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key",
user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key",
user_config.dbx_secret_token: "se_streaming_auth_secret_token_key",
user_config.dbx_topic_name: "se_streaming_topic_name",
}
Similarly when sensitive store is cerberus:
from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
user_config.se_enable_streaming: True,
user_config.secret_type: "databricks",
user_config.cbs_url : "https://<url>.cerberus.com",
user_config.cbs_sdb_path: "cerberus_sdb_path",
user_config.cbs_kafka_server_url: "se_streaming_server_url_secret_sdb_path",
user_config.cbs_secret_token_url: "se_streaming_auth_secret_token_url_sdb_apth",
user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path",
user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path",
user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path",
}
from spark_expectations.core.expectations import SparkExpectations
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(product_id="your-products-id", stats_streaming_options=stats_streaming_config_dict)
Example 1¶
from spark_expectations.config.user_config import *
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
table_name="pilot_nonpub.dq_employee.employee",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
write_to_table=True,
write_to_temp_table=True,
row_dq=True,
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
spark_conf=se_global_spark_Conf,
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order
Example 2¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats")
),
row_dq=True
)
def build_new() -> DataFrame:
_df: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
)
return df
Example 3¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=False,
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: False,
}
)
def build_new() -> DataFrame:
_df: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
)
return df
Example 4¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=True,
query_dq={
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order
Example 5¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=True,
agg_dq={ (10)
user_config.user_configse_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: False,
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
Example 6¶
import os
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
spark_conf=se_global_spark_Conf,
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
- There are four types of notifications: notification_on_start, notification_on_completion, notification_on_fail and notification_on_error_threshold_breach.
Enable notifications for all four stages by setting the values to
True
Example 7¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
row_dq=False,
agg_dq={
user_config.se_agg_dq: False,
user_config.se_source_agg_dq: False,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: False,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
Example 8¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats",
actions_if_failed=["drop", "ignore"]
)
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order
Example 9¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats",
actions_if_failed=["drop", "ignore"]
),
row_dq=True,
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
}
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order
Example 10¶
@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
target_table_name="pilot_nonpub.customer_order",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
spark_conf={"spark.files.maxPartitionBytes": "134217728"},
options={"mode": "overwrite", "partitionBy": "order_month",
"overwriteSchema": "true"},
options_error_table={"partition_by": "id"}
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
return _df_order