Quick Start¶
To successfully run spark-expectations user needs to create Rules
table as a first step.
Required Tables¶
Spark expectation expects that Rules table is created for spark-expectations to run seamlessly and integrate with a spark job.
Rules Table¶
The below SQL statements used three namespaces which works with Databricks Unity Catalog, but if you are using hive please update the namespaces accordingly and also provide necessary table metadata.
We need to create a rules tables which contains all the data quality rules. Please use the below template to create your rules table for your project.
create table if not exists `catalog`.`schema`.`{product}_rules` (
product_id STRING, -- (1)!
table_name STRING, -- (2)!
rule_type STRING, -- (3)!
rule STRING, -- (4)!
column_name STRING, -- (5)!
expectation STRING, -- (6)!
action_if_failed STRING, -- (7)!
tag STRING, -- (8)!
description STRING, -- (9)!
enable_for_source_dq_validation BOOLEAN, -- (10)!
enable_for_target_dq_validation BOOLEAN, -- (11)!
is_active BOOLEAN, -- (12)!
enable_error_drop_alert BOOLEAN, -- (13)!
error_drop_threshold INT, -- (14)!
query_dq_delimiter STRING, -- (15)!
enable_querydq_custom_output BOOLEAN, -- (16)!
);
product_id
A unique name at the level of dq rules executiontable_name
The table for which the rule is being defined forrule_type
3 different type of rules. They are 'row_dq', 'agg_dq' and 'query_dq'rule
Short description of the rulecolumn_name
The column name for which the rule is defined for. This only applies for row_dq. For agg_dq and query_dq, use blank/empty value.expectation
Provide the DQ rule conditionaction_if_failed
There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'. Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types. Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type. Fail: job fails if the rule fails. Applies for all 3 rule types.tag
provide some tag name to dq rule example: completeness, validity, uniqueness etc.description
Long description for the ruleenable_for_source_dq_validation
flag to run the agg ruleenable_for_target_dq_validation
flag to run the query ruleis_active
true or false to indicate if the rule is active or not.enable_error_drop_alert
true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data seterror_drop_threshold
Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data setquery_dq_delimiter
segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query.enable_querydq_custom_output
required custom query output in separate table
The Spark Expectation process consists of three phases: 1. When enable_for_source_dq_validation is true, execute agg_dq and query_dq on the source Dataframe 2. If the first step is successful, proceed to run row_dq 3. When enable_for_target_dq_validation is true, execute agg_dq and query_dq on the Dataframe resulting from row_dq
Rule Type¶
The rules column has a column called "rule_type". It is important that this column should only accept one of
these three values - [row_dq, agg_dq, query_dq]
. If other values are provided, the library may cause unforeseen errors.
Please run the below command to add constraints to the above created rules table
ALTER TABLE `catalog`.`schema`.`{product}_rules`
ADD CONSTRAINT rule_type_action CHECK (rule_type in ('row_dq', 'agg_dq', 'query_dq'));
For further information about creating individual rules please refer to the Rules Guide
Initiating Spark Expectation¶
Sample input data¶
data = [
{"id": 1, "age": 25, "email": "alice@example.com"},
{"id": 2, "age": 17, "email": "bob@example.com"},
{"id": 3, "age": None, "email": "charlie@example.com"},
{"id": 4, "age": 40, "email": "bob@example.com"},
{"id": 5, "age": None, "email": "ron@example.com"},
{"id": 6, "age": 41, "email": None},
]
input_df = spark.createDataFrame(pd.DataFrame(data))
input_df.show(truncate=False)
Insert expectations into Rules table¶
# Name of the rules table previously created
rules_table = f"{catalog}.{schema}.{product}_rules"
product_identifier = "test_product"
rules_data = [
{
"product_id": product_identifier,
"table_name": f"{catalog}.{schema}.{target_table_name}",
"rule_type": "row_dq",
"rule": "age_not_null",
"column_name": "age",
"expectation": "age IS NOT NULL",
"action_if_failed": "drop",
"tag": "completeness",
"description": "Age must not be null",
"enable_for_source_dq_validation": True,
"enable_for_target_dq_validation": True,
"is_active": True,
"enable_error_drop_alert": False,
"error_drop_threshold": 0,
}
]
import pandas as pd
rules_df = spark.createDataFrame(pd.DataFrame(rules_data))
rules_df.show(truncate=True)
rules_df.write.mode("overwrite").saveAsTable(rules_table)
Streaming and User config¶
Following example let's spark-expectation use default configuration.
Only configuration we are passing is to disable streaming option
from spark_expectations.config.user_config import Constants as user_config
stats_streaming_config_dict = {
user_config.se_enable_streaming: False
}
user_config = {}
Run SparkExpectations job¶
Please reference Spark Expectation notebooks for fully functioning examples.
Note
Spark-Expectation repository itself provides docker compose yaml file for running those notebooks.
from pyspark.sql import DataFrame
from spark_expectations.core.expectations import (
SparkExpectations, WrappedDataFrameWriter
)
from spark_expectations.core import load_configurations
# Initialize Default Config: in a future release this will not be required
load_configurations(spark)
writer = WrappedDataFrameWriter().mode("append").format("delta")
se = SparkExpectations(
product_id=f"{product_identifier}", # (1)!
rules_df=rules_dataframe, # (2)!
stats_table=f"{catalog}.{schema}.{stats_table_name}", # (3)!
stats_table_writer=writer, # (4)!
target_and_error_table_writer=writer, # (5)!
stats_streaming_options=stats_streaming_config_dict # (6)!
)
"""
This decorator helps to wrap a function which returns dataframe and apply dataframe rules on it
Args:
target_table: Name of the table where the final dataframe need to be written
write_to_table: Mark it as "True" if the dataframe need to be written as table
write_to_temp_table: Mark it as "True" if the input dataframe need to be written to the temp table to break
the spark plan
user_conf: Provide options to override the defaults, while writing into the stats streaming table
target_table_view: This view is created after the _row_dq process to run the target agg_dq and query_dq.
If value is not provided, defaulted to {target_table}_view
target_and_error_table_writer: Provide the writer to write the target and error table,
this will take precedence over the class level writer
Returns:
Any: Returns a function which applied the expectations on dataset
"""
@se.with_expectations(
target_table=f"{catalog}.{schema}.{target_table_name}",
write_to_table=True,
write_to_temp_table=True,
user_conf=user_config,
)
def get_dataset():
_df_source: DataFrame = input_df
_df_source.createOrReplaceTempView("in_memory_data_source")
return _df_source
# This will run the DQ checks and raise if any "fail" rules are violated
get_dataset()
- The unique product identifier for DQ execution.
- DataFrame containing the rules to apply.
- Name of the stats table for logging results.If it doesn't exist it will be generated
- Writer object for the stats table.
- Writer object for target and error tables.
- Dictionary of streaming options for stats.