Skip to content

Initialization_Examples

Configurations

In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.

from spark_expectations.config.user_config import Constants as user_config

se_global_spark_Conf = {
    user_config.se_notifications_enable_email: False,  
    user_config.se_notifications_email_smtp_host: "mailhost.com",  
    user_config.se_notifications_email_smtp_port: 25,  
    user_config.se_notifications_email_from: "<sender_email_id>",  
    user_config.se_notifications_email_to_other_mail_id: "<receiver_email_id's>",  
    user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",  
    user_config.se_notifications_enable_slack: True,  
    user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>",  
    user_config.se_notifications_on_start: True,  
    user_config.se_notifications_on_completion: True,  
    user_config.se_notifications_on_fail: True,  
    user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,  
    user_config.se_notifications_on_error_drop_threshold: 15,  
}

Spark Expectations Initialization

For all the below examples the below import and SparkExpectations class instantiation is mandatory

When store for sensitive details is Databricks secret scope,construct config dictionary for authentication of kafka and avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values. This dictionary can be placed in the init.py file of your project or declared as a global variable.

from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config

stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
                user_config.se_enable_streaming: True, 
                user_config.secret_type: "databricks", 
                user_config.dbx_workspace_url  : "https://workspace.cloud.databricks.com", 
                user_config.dbx_secret_scope: "sole_common_prod", 
                user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key", 
                user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key", 
                user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key", 
                user_config.dbx_secret_token: "se_streaming_auth_secret_token_key", 
                user_config.dbx_topic_name: "se_streaming_topic_name", 
            }

Similarly when sensitive store is cerberus:

from typing import Dict, Union
from spark_expectations.config.user_config import Constants as user_config

stats_streaming_config_dict: Dict[str, Union[bool, str]] = {
                user_config.se_enable_streaming: True, 
                user_config.secret_type: "databricks", 
                user_config.cbs_url  : "https://<url>.cerberus.com", 
                user_config.cbs_sdb_path: "cerberus_sdb_path", 
                user_config.cbs_kafka_server_url: "se_streaming_server_url_secret_sdb_path", 
                user_config.cbs_secret_token_url: "se_streaming_auth_secret_token_url_sdb_apth", 
                user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path", 
                user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path", 
                user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path", 
            }
from spark_expectations.core.expectations import SparkExpectations

# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(product_id="your-products-id", stats_streaming_options=stats_streaming_config_dict)  

Example 1

from spark_expectations.config.user_config import *  


@se.with_expectations(  
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules",  
        table_name="pilot_nonpub.dq_employee.employee",  
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"  
    ),
    write_to_table=True,  
    write_to_temp_table=True,  
    row_dq=True,  
    agg_dq={  
        user_config.se_agg_dq: True,  
        user_config.se_source_agg_dq: True,  
        user_config.se_final_agg_dq: True,  
    },
    query_dq={  
        user_config.se_query_dq: True,  
        user_config.se_source_query_dq: True,  
        user_config.se_final_query_dq: True,  
        user_config.se_target_table_view: "order",  
    },
    spark_conf=se_global_spark_Conf,  

)
def build_new() -> DataFrame:
    _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    _df_order.createOrReplaceTempView("order")  

    _df_product: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
    )
    _df_product.createOrReplaceTempView("product")  

    _df_customer: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
    )

    _df_customer.createOrReplaceTempView("customer")  

    return _df_order  

Example 2

@se.with_expectations(  
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats")
    ),
   row_dq=True  
)
def build_new() -> DataFrame:
    _df: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
    )
    return df 

Example 3

@se.with_expectations(  
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
   row_dq=False,  
   agg_dq={
        user_config.se_agg_dq: True, 
        user_config.se_source_agg_dq: True, 
        user_config.se_final_agg_dq: False, 
    }

)
def build_new() -> DataFrame:
    _df: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/employee.csv"))
    )
    return df 

Example 4

@se.with_expectations(  
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
   row_dq=True, 
   query_dq={  
        user_config.se_query_dq: True,
        user_config.se_source_query_dq: True,
        user_config.se_final_query_dq: True,
        user_config.se_target_table_view: "order",
    },

)
def build_new() -> DataFrame:
    _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    _df_order.createOrReplaceTempView("order") 
    _df_product: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/product.csv")) 
    )
    _df_product.createOrReplaceTempView("product")

    _df_customer: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
    )

    _df_customer.createOrReplaceTempView("customer")

    return _df_order

Example 5

@se.with_expectations(  
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
   row_dq=True, 
   agg_dq={ (10)
        user_config.user_configse_agg_dq: True, 
        user_config.se_source_agg_dq: True,
        user_config.se_final_agg_dq: False, 
    }, 

)
def build_new() -> DataFrame:
   _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    return _df_order 

Example 6

import os

@se.with_expectations(
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
    spark_conf=se_global_spark_Conf, 

)
def build_new() -> DataFrame:
   _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    return _df_order 

Example 7

@se.with_expectations( 
    se.reader.get_rules_from_table(  
        product_rules_table="pilot_nonpub.dq.dq_rules", 
        target_table_name="pilot_nonpub.customer_order", 
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
   row_dq=False, 
   agg_dq={ 
        user_config.se_agg_dq: False,
        user_config.se_source_agg_dq: False, 
        user_config.se_final_agg_dq: True,
    },  
   query_dq={ 
        user_config.se_query_dq: False,
        user_config.se_source_query_dq: True,
        user_config.se_final_query_dq: True,
        user_config.se_target_table_view: "order", 
    },
)
def build_new() -> DataFrame:
   _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    return _df_order 

Example 8

@se.with_expectations( 
    se.reader.get_rules_from_table(
        product_rules_table="pilot_nonpub.dq.dq_rules",
        target_table_name="pilot_nonpub.customer_order",
        dq_stats_table_name="pilot_nonpub.dq.dq_stats",
        actions_if_failed=["drop", "ignore"]  
    )
)
def build_new() -> DataFrame:
   _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    return _df_order 

Example 9

@se.with_expectations( 
    se.reader.get_rules_from_table(
        product_rules_table="pilot_nonpub.dq.dq_rules",
        target_table_name="pilot_nonpub.customer_order",
        dq_stats_table_name="pilot_nonpub.dq.dq_stats",
        actions_if_failed=["drop", "ignore"]  
    ),
    row_dq=True,  
   agg_dq={ 
        user_config.se_agg_dq: True, 
        user_config.se_source_agg_dq: True,
        user_config.se_final_agg_dq: True, 
    },  
   query_dq={ 
        user_config.se_query_dq: True,
        user_config.se_source_query_dq: True,
        user_config.se_final_query_dq: True,
        user_config.se_target_table_view: "order", 
    }

)
def build_new() -> DataFrame:
   _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    _df_order.createOrReplaceTempView("order") 

    _df_product: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/product.csv")) 
    )
    _df_product.createOrReplaceTempView("product")

    _df_customer: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
    )

    _df_customer.createOrReplaceTempView("customer") 

    return _df_order

Example 10

@se.with_expectations(
    se.reader.get_rules_from_table(
        product_rules_table="pilot_nonpub.dq.dq_rules",
        target_table_name="pilot_nonpub.customer_order",
        dq_stats_table_name="pilot_nonpub.dq.dq_stats"
    ),
   spark_conf={"spark.files.maxPartitionBytes": "134217728"},  
   options={"mode": "overwrite", "partitionBy": "order_month", 
            "overwriteSchema": "true"},  
   options_error_table={"partition_by": "id"}  
)
def build_new() -> DataFrame:
    _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    return _df_order