Setup
Installation¶
The library is available in the Python Package Index (PyPi) and can be installed in your environment using the below command or add the library "spark-expectations" into the requirements.txt or poetry dependencies.
Required Tables¶
There are two tables that need to be created for spark-expectations to run seamlessly and integrate with a spark job. The below SQL statements used three namespaces which works with Databricks Unity Catalog, but if you are using hive please update the namespaces accordingly and also provide necessary table metadata.
Rules Table¶
We need to create a rules tables which contains all the data quality rules. Please use the below template to create your rules table for your project.
create table if not exists `catalog`.`schema`.`{product}_rules` (
product_id STRING, -- (1)!
table_name STRING, -- (2)!
rule_type STRING, -- (3)!
rule STRING, -- (4)!
column_name STRING, -- (5)!
expectation STRING, -- (6)!
action_if_failed STRING, -- (7)!
tag STRING, -- (8)!
description STRING, -- (9)!
enable_for_source_dq_validation BOOLEAN, -- (10)!
enable_for_target_dq_validation BOOLEAN, -- (11)!
is_active BOOLEAN, -- (12)!
enable_error_drop_alert BOOLEAN, -- (13)!
error_drop_threshold INT, -- (14)!
query_dq_delimiter STRING, -- (15)!
enable_querydq_custom_output BOOLEAN, -- (16)!
);
product_id
A unique name at the level of dq rules executiontable_name
The table for which the rule is being defined forrule_type
3 different type of rules. They are 'row_dq', 'agg_dq' and 'query_dq'rule
Short description of the rulecolumn_name
The column name for which the rule is defined for. This only applies for row_dq. For agg_dq and query_dq, use blank/empty value.expectation
Provide the DQ rule conditionaction_if_failed
There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'. Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types. Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type. Fail: job fails if the rule fails. Applies for all 3 rule types.tag
provide some tag name to dq rule example: completeness, validity, uniqueness etc.description
Long description for the ruleenable_for_source_dq_validation
flag to run the agg ruleenable_for_target_dq_validation
flag to run the query ruleis_active
true or false to indicate if the rule is active or not.enable_error_drop_alert
true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data seterror_drop_threshold
Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data setquery_dq_delimiter
segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query.enable_querydq_custom_output
required custom query output in separate table
The Spark Expectation process consists of three phases: 1. When enable_for_source_dq_validation is true, execute agg_dq and query_dq on the source Dataframe 2. If the first step is successful, proceed to run row_dq 3. When enable_for_target_dq_validation is true, exeucte agg_dq and query_dq on the Dataframe resulting from row_dq
Rule Type For Rules¶
The rules column has a column called "rule_type". It is important that this column should only accept one of
these three values - [row_dq, agg_dq, query_dq]
. If other values are provided, the library may cause unforeseen errors.
Please run the below command to add constraints to the above created rules table
ALTER TABLE `catalog`.`schema`.`{product}_rules`
ADD CONSTRAINT rule_type_action CHECK (rule_type in ('row_dq', 'agg_dq', 'query_dq'));
Action If Failed For Row, Aggregation and Query Data Quality Rules¶
The rules column has a column called "action_if_failed". It is important that this column should only accept one of
these values - [fail, drop or ignore]
for 'rule_type'='row_dq'
and [fail, ignore]
for 'rule_type'='agg_dq' and 'rule_type'='query_dq'
.
If other values are provided, the library may cause unforeseen errors.
Please run the below command to add constraints to the above created rules table
ALTER TABLE apla_nd_dq_rules ADD CONSTRAINT action CHECK
((rule_type = 'row_dq' and action_if_failed IN ('ignore', 'drop', 'fail')) or
(rule_type = 'agg_dq' and action_if_failed in ('ignore', 'fail')) or
(rule_type = 'query_dq' and action_if_failed in ('ignore', 'fail')));
DQ Stats Table¶
In order to collect the stats/metrics for each data quality job run, the spark-expectations job will automatically create the stats table if it does not exist. The below SQL statement can be used to create the table if you want to create it manually, but it is not recommended.
create table if not exists `catalog`.`schema`.`dq_stats` (
product_id STRING, -- (1)!
table_name STRING, -- (2)!
input_count LONG, -- (3)!
error_count LONG, -- (4)!
output_count LONG, -- (5)!
output_percentage FLOAT, -- (6)!
success_percentage FLOAT, -- (7)!
error_percentage FLOAT, -- (8)!
source_agg_dq_results array<map<string, string>>, -- (9)!
final_agg_dq_results array<map<string, string>>, -- (10)!
source_query_dq_results array<map<string, string>>, -- (11)!
final_query_dq_results array<map<string, string>>, -- (12)!
row_dq_res_summary array<map<string, string>>, -- (13)!
row_dq_error_threshold array<map<string, string>>, -- (14)!
dq_status map<string, string>, -- (15)!
dq_run_time map<string, float>, -- (16)!
dq_rules map<string, map<string,int>>, -- (17)!
meta_dq_run_id STRING, -- (18)!
meta_dq_run_date DATE, -- (19)!
meta_dq_run_datetime TIMESTAMP, -- (20)!
);
product_id
A unique name at the level of dq rules executiontable_name
The table for which the rule is being defined forinput_count
total input row count of given dataframeerror_count
total error count for all row_dq rulesoutput_count
total count of records that passed the row_dq rules or configured to be ignored when they failoutput_percentage
percentage of total count of records that passed the row_dq rules or configured to be ignored when they failsuccess_percentage
percentage of total count of records that passed the row_dq ruleserror_percentage
percentage of total count of records that failed the row_dq rulessource_agg_dq_results
results for agg dq rules are storedfinal_agg_dq_results
results for agg dq rules are stored after row_dq rules executedsource_query_dq_results
results for query dq rules are storedfinal_query_dq_results
results for query dq rules are stored after row_dq rules executedrow_dq_res_summary
summary of row dq results are storedrow_dq_error_threshold
threshold for rules defined in the rules table for row_dq rulesdq_status
stores the status of the rule execution.dq_run_time
time taken by the rulesdq_rules
how many dq rules are executed in this runmeta_dq_run_id
unique id generated for this runmeta_dq_run_date
date on which rule is executedmeta_dq_run_datetime
date and time on which rule is executed
DQ Detailed Stats Table¶
This table provides detailed stats of all the expectations along with the status provided in the stats table in a relational format. This table need not be created. It gets auto created with "_detailed " to the dq stats table name. This is optional and only get's created if the config is set to have the detailed stats table. Below is the schema
create table if not exists `catalog`.`schema`.`dq_stats_detailed` (
run_id string, -- (1)!
product_id string, -- (2)!
table_name string, -- (3)!
rule_type string, -- (4)!
rule string, -- (5)!
source_expectations string, -- (6)!
tag string, -- (7)!
description string, -- (8)!
source_dq_status string, -- (9)!
source_dq_actual_outcome string, -- (10)!
source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
source_dq_start_time string, -- (15)!
source_dq_end_time string, -- (16)!
target_expectations string, -- (17)!
target_dq_status string, -- (18)!
target_dq_actual_outcome string, -- (19)!
target_dq_expected_outcome string, -- (20)!
target_dq_actual_row_count string, -- (21)!
target_dq_error_row_count string, -- (22)!
target_dq_row_count string, -- (23)!
target_dq_start_time string, -- (24)!
target_dq_end_time string, -- (25)!
dq_date date, -- (26)!
dq_time string, -- (27)!
dq_job_metadata_info string, -- (28)!
);
run_id
Run Id for a specific runproduct_id
Unique product identifiertable_name
The target table where the final data gets insertedrule_type
Either row/query/agg dqrule
Rule namesource_expectations
Actual Rule to be executed on the source dqtag
completeness,uniqueness,validity,accuracy,consistency,description
Description of the Rulesource_dq_status
Status of the rule execution in the Source dqsource_dq_actual_outcome
Actual outcome of the Source dq checksource_dq_expected_outcome
Expected outcome of the Source dq checksource_dq_actual_row_count
Number of rows of the source dqsource_dq_error_row_count
Number of rows failed in the source dqsource_dq_row_count
Number of rows of the source dqsource_dq_start_time
source dq start timestampsource_dq_end_time
source dq end timestamptarget_expectations
Actual Rule to be executed on the target dqtarget_dq_status
Status of the rule execution in the Target dqtarget_dq_actual_outcome
Actual outcome of the Target dq checktarget_dq_expected_outcome
Expected outcome of the Target dq checktarget_dq_actual_row_count
Number of rows of the target dqtarget_dq_error_row_count
Number of rows failed in the target dqtarget_dq_row_count
Number of rows of the target dqtarget_dq_start_time
target dq start timestamptarget_dq_end_time
target dq end timestampdq_date
Dq executed datedq_time
Dq executed timestampdq_job_metadata_info
dq job metadata