Skip to content

Classes

spark_expectations.utils.reader.SparkExpectationsReader dataclass

This class implements/supports reading data from source system

Functions

get_rules_from_df(rules_df: DataFrame, target_table: str, is_dlt: bool = False, tag: Optional[str] = None, params: Optional[dict] = None) -> Tuple[Dict, Dict, Dict]

This function fetches the data quality rules from the table and return it as a dictionary

Parameters:

Name Type Description Default
rules_df DataFrame

DataFrame which has your data quality rules

required
target_table str

Provide the full table name for which the data quality rules are being run

required
is_dlt bool

True if this for fetching the rules for dlt job

False
tag Optional[str]

If is_dlt is True, provide the KPI for which you are running the data quality rule

None
params Optional[dict]

dictionary values for dynamically updating dq rules

None

Returns:

Name Type Description
tuple Tuple[Dict, Dict, Dict]

returns a tuple of two dictionaries with key as 'rule_type' and 'rules_table_row' as value in expectations. dict, and key as 'dq_stage_setting' and 'boolean' as value in rules_execution_settings dict

Source code in spark_expectations/utils/reader.py
def get_rules_from_df(
    self,
    rules_df: DataFrame,
    target_table: str,
    is_dlt: bool = False,
    tag: Optional[str] = None,
    params: Optional[dict] = None,
) -> Tuple[Dict, Dict, Dict]:
    """
    This function fetches the data quality rules from the table and return it as a dictionary

    Args:
        rules_df: DataFrame which has your data quality rules
        target_table: Provide the full table name for which the data quality rules are being run
        is_dlt: True if this for fetching the rules for dlt job
        tag: If is_dlt is True, provide the KPI for which you are running the data quality rule
        params: dictionary values for dynamically updating dq rules

    Returns:
        tuple: returns a tuple of two dictionaries with key as 'rule_type' and 'rules_table_row' as value in
            expectations. dict, and key as 'dq_stage_setting' and 'boolean' as value in rules_execution_settings
                dict
    """
    try:
        self._context.set_final_table_name(target_table)
        self._context.set_error_table_name(f"{target_table}_error")
        self._context.set_table_name(target_table)
        self._context.set_env(os.environ.get("SPARKEXPECTATIONS_ENV"))

        self._context.reset_num_agg_dq_rules()
        self._context.reset_num_dq_rules()
        self._context.reset_num_row_dq_rules()
        self._context.reset_num_query_dq_rules()

        if params is not None:
            rules_df = reduce(
                lambda df, kv: df.withColumn(
                    "table_name",
                    expr(f"REPLACE(table_name, '{{{kv[0]}}}', '{kv[1]}')"),
                ),
                params.items(),
                rules_df,
            )

        _rules_df = rules_df.filter(
            (rules_df.product_id == self._context.product_id)
            & (rules_df.table_name == target_table)
            & rules_df.is_active
        )

        if not params:
            params = {}

        self._context.print_dataframe_with_debugger(_rules_df)

        _expectations: dict = {}
        _dq_queries_dict: dict = {}
        _rules_execution_settings: dict = {}
        if is_dlt:
            if tag:
                for row in _rules_df.filter(_rules_df.tag == tag).collect():
                    _expectations[row["rule"]] = row["expectation"].format(**params)
            else:
                for row in _rules_df.collect():
                    _expectations[row["rule"]] = row["expectation"].format(**params)
        else:
            for row in _rules_df.collect():
                column_map = {
                    "product_id": row["product_id"].format(**params),
                    "table_name": row["table_name"].format(**params),
                    "rule_type": row["rule_type"],
                    "rule": row["rule"].format(**params),
                    "column_name": row["column_name"],
                    "expectation": row["expectation"],
                    "action_if_failed": row["action_if_failed"],
                    "enable_for_source_dq_validation": row[
                        "enable_for_source_dq_validation"
                    ],
                    "enable_for_target_dq_validation": row[
                        "enable_for_target_dq_validation"
                    ],
                    "tag": row["tag"],
                    "description": row["description"],
                    "enable_error_drop_alert": row["enable_error_drop_alert"],
                    "error_drop_threshold": row["error_drop_threshold"],
                }

                if row["rule_type"] == self._context.get_query_dq_rule_type_name:
                    _dq_queries_dict, column_map = self._process_rules_df(
                        _dq_queries_dict, column_map, row.asDict(), params
                    )

                if f"{row['rule_type']}_rules" in _expectations:
                    _expectations[f"{row['rule_type']}_rules"].append(column_map)
                else:
                    _expectations[f"{row['rule_type']}_rules"] = [column_map]

                # count the rules enabled for the current run
                if row["rule_type"] == self._context.get_row_dq_rule_type_name:
                    self._context.set_num_row_dq_rules()
                elif row["rule_type"] == self._context.get_agg_dq_rule_type_name:
                    self._context.set_num_agg_dq_rules(
                        row["enable_for_source_dq_validation"],
                        row["enable_for_target_dq_validation"],
                    )
                elif row["rule_type"] == self._context.get_query_dq_rule_type_name:
                    self._context.set_num_query_dq_rules(
                        row["enable_for_source_dq_validation"],
                        row["enable_for_target_dq_validation"],
                    )

                _expectations["target_table_name"] = target_table
            _rules_execution_settings = self._get_rules_execution_settings(
                _rules_df
            )

        return _dq_queries_dict, _expectations, _rules_execution_settings
    except Exception as e:
        raise SparkExpectationsMiscException(
            f"error occurred while retrieving rules list from the table {e}"
        )

set_notification_param(notification: Optional[Dict[str, Union[int, str, bool]]] = None) -> None

This function supports to read notifications configurations Returns: None

Source code in spark_expectations/utils/reader.py
def set_notification_param(
    self, notification: Optional[Dict[str, Union[int, str, bool]]] = None
) -> None:
    """
    This function supports to read notifications configurations
    Returns: None

    """
    try:
        _default_spark_conf: Dict[str, Union[str, int, bool]] = {
            user_config.se_notifications_enable_email: False,
            user_config.se_notifications_email_smtp_host: "",
            user_config.se_notifications_email_smtp_port: 25,
            user_config.se_notifications_email_from: "",
            user_config.se_notifications_email_to_other_mail_id: "",
            user_config.se_notifications_email_subject: "spark-expectations-testing",
            user_config.se_notifications_enable_slack: False,
            user_config.se_notifications_slack_webhook_url: "",
            user_config.se_notifications_enable_teams: False,
            user_config.se_notifications_teams_webhook_url: "",
            user_config.se_notifications_enable_zoom: False,
            user_config.se_notifications_zoom_webhook_url: "",
            user_config.se_notifications_zoom_token: "",
        }

        _notification_dict: Dict[str, Union[str, int, bool]] = (
            {**_default_spark_conf, **notification}
            if notification
            else _default_spark_conf
        )

        if (
            _notification_dict.get(user_config.se_notifications_enable_email)
            is True
        ):
            if (
                _notification_dict[user_config.se_notifications_email_smtp_host]
                and _notification_dict[user_config.se_notifications_email_from]
                and _notification_dict[
                    user_config.se_notifications_email_to_other_mail_id
                ]
                and _notification_dict[user_config.se_notifications_email_subject]
            ):
                self._context.set_enable_mail(True)
                self._context.set_to_mail(
                    str(
                        _notification_dict[
                            user_config.se_notifications_email_to_other_mail_id
                        ]
                    )
                )
                self._context.set_mail_subject(
                    str(
                        _notification_dict[
                            user_config.se_notifications_email_subject
                        ]
                    )
                )
                self._context.set_mail_smtp_server(
                    str(
                        _notification_dict[
                            user_config.se_notifications_email_smtp_host
                        ]
                    )
                )
                self._context.set_mail_smtp_port(
                    int(
                        _notification_dict[
                            user_config.se_notifications_email_smtp_port
                        ]
                    )
                )

                self._context.set_mail_from(
                    str(_notification_dict[user_config.se_notifications_email_from])
                )
            else:
                raise SparkExpectationsMiscException(
                    "All params/variables required for email notification is not configured or supplied"
                )

        if _notification_dict[user_config.se_notifications_enable_slack] is True:
            if _notification_dict[user_config.se_notifications_slack_webhook_url]:
                self._context.set_enable_slack(True)
                self._context.set_slack_webhook_url(
                    str(
                        _notification_dict[
                            user_config.se_notifications_slack_webhook_url
                        ]
                    )
                )
            else:
                raise SparkExpectationsMiscException(
                    "All params/variables required for slack notification is not configured or supplied"
                )

        if _notification_dict[user_config.se_notifications_enable_teams] is True:
            if _notification_dict[user_config.se_notifications_teams_webhook_url]:
                self._context.set_enable_teams(True)
                self._context.set_teams_webhook_url(
                    str(
                        _notification_dict[
                            user_config.se_notifications_teams_webhook_url
                        ]
                    )
                )
            else:
                raise SparkExpectationsMiscException(
                    "All params/variables required for slack notification is not configured or supplied"
                )

            if _notification_dict[user_config.se_notifications_enable_zoom] is True:
                if _notification_dict[
                    user_config.se_notifications_zoom_webhook_url
                ]:
                    self._context.set_enable_zoom(True)
                    self._context.set_zoom_webhook_url(
                        str(
                            _notification_dict[
                                user_config.se_notifications_zoom_webhook_url
                            ]
                        )
                    )
                    self._context.set_zoom_token(
                        str(
                            _notification_dict[
                                user_config.se_notifications_zoom_token
                            ]
                        )
                    )
                else:
                    raise SparkExpectationsMiscException(
                        "All params/variables required for zoom notification is not configured or supplied"
                    )

    except Exception as e:
        raise SparkExpectationsMiscException(
            f"error occurred while reading notification configurations {e}"
        )