Skip to content

Classes

spark_expectations.core.expectations.SparkExpectations(product_id: str, rules_df: DataFrame, stats_table: str, target_and_error_table_writer: Union[WrappedDataFrameWriter, WrappedDataFrameStreamWriter], stats_table_writer: WrappedDataFrameWriter, debugger: bool = False, stats_streaming_options: Optional[Dict[str, Union[str, bool]]] = None) dataclass

This class implements/supports running the data quality rules on a dataframe returned by a function

Parameters:

Name Type Description Default
product_id str

Name of the product

required
rules_df DataFrame

DataFrame which contains the rules. User is responsible for reading the rules_table in which ever system it is

required
stats_table str

Name of the table where the stats/audit-info need to be written

required
debugger bool

Mark it as "True" if the debugger mode need to be enabled, by default is False

False
stats_streaming_options Optional[Dict[str, Union[str, bool]]]

Provide options to override the defaults, while writing into the stats streaming table

None

Attributes

debugger: bool = False class-attribute instance-attribute

product_id: str instance-attribute

rules_df: DataFrame instance-attribute

stats_streaming_options: Optional[Dict[str, Union[str, bool]]] = None class-attribute instance-attribute

stats_table: str instance-attribute

stats_table_writer: WrappedDataFrameWriter instance-attribute

target_and_error_table_writer: Union[WrappedDataFrameWriter, WrappedDataFrameStreamWriter] instance-attribute

Functions

with_expectations(target_table: str, write_to_table: bool = False, write_to_temp_table: bool = False, user_conf: Optional[Dict[str, Union[str, int, bool, Dict[str, str]]]] = None, target_table_view: Optional[str] = None, target_and_error_table_writer: Optional[Union[WrappedDataFrameWriter, WrappedDataFrameStreamWriter]] = None) -> Any

This decorator helps to wrap a function which returns dataframe and apply dataframe rules on it

Parameters:

Name Type Description Default
target_table str

Name of the table where the final dataframe need to be written

required
write_to_table bool

Mark it as "True" if the dataframe need to be written as table

False
write_to_temp_table bool

Mark it as "True" if the input dataframe need to be written to the temp table to break the spark plan

False
user_conf Optional[Dict[str, Union[str, int, bool, Dict[str, str]]]]

Provide options to override the defaults, while writing into the stats streaming table

None
target_table_view Optional[str]

This view is created after the _row_dq process to run the target agg_dq and query_dq. If value is not provided, defaulted to {target_table}_view

None
target_and_error_table_writer Optional[Union[WrappedDataFrameWriter, WrappedDataFrameStreamWriter]]

Provide the writer to write the target and error table, this will take precedence over the class level writer

None

Returns:

Name Type Description
Any Any

Returns a function which applied the expectations on dataset

Source code in spark_expectations/core/expectations.py
def with_expectations(
    self,
    target_table: str,
    write_to_table: bool = False,
    write_to_temp_table: bool = False,
    user_conf: Optional[Dict[str, Union[str, int, bool, Dict[str, str]]]] = None,
    target_table_view: Optional[str] = None,
    target_and_error_table_writer: Optional[Union["WrappedDataFrameWriter", "WrappedDataFrameStreamWriter"]] = None,
) -> Any:
    """
    This decorator helps to wrap a function which returns dataframe and apply dataframe rules on it

    Args:
        target_table: Name of the table where the final dataframe need to be written
        write_to_table: Mark it as "True" if the dataframe need to be written as table
        write_to_temp_table: Mark it as "True" if the input dataframe need to be written to the temp table to break
                            the spark plan
        user_conf: Provide options to override the defaults, while writing into the stats streaming table
        target_table_view: This view is created after the _row_dq process to run the target agg_dq and query_dq.
            If value is not provided, defaulted to {target_table}_view
        target_and_error_table_writer: Provide the writer to write the target and error table,
            this will take precedence over the class level writer

    Returns:
        Any: Returns a function which applied the expectations on dataset
    """

    def _except(func: Any) -> Any:

        self._check_serverless_config(user_conf)
        _notification_dict, _se_stats_streaming_dict = self._build_config_dicts(user_conf)
        self._set_kafka_configs(_se_stats_streaming_dict)
        self._set_notification_configs(_notification_dict)
        if target_and_error_table_writer:
            self._context.set_target_and_error_table_writer_config(target_and_error_table_writer.build())
        self._set_agg_query_detailed_stats(_notification_dict)

        (
            dq_queries_dict,
            expectations,
            rules_execution_settings,
        ) = self.reader.get_rules_from_df(self.rules_df, target_table, params=self._context.get_dq_rules_params)

        _row_dq: bool = rules_execution_settings.get("row_dq", False)
        _source_agg_dq: bool = rules_execution_settings.get("source_agg_dq", False)
        _target_agg_dq: bool = rules_execution_settings.get("target_agg_dq", False)
        _source_query_dq: bool = rules_execution_settings.get("source_query_dq", False)
        _target_query_dq: bool = rules_execution_settings.get("target_query_dq", False)
        _target_table_view: str = target_table_view if target_table_view else f"{target_table.split('.')[-1]}_view"

        self._set_notification_context(_notification_dict, _se_stats_streaming_dict)
        self._context.set_dq_expectations(expectations)
        self._context.set_rules_execution_settings_config(rules_execution_settings)
        self._context.set_querydq_secondary_queries(dq_queries_dict)

        @self._notification.send_notification_decorator
        @self._statistics_decorator.collect_stats_decorator
        @functools.wraps(func)
        def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
            try:
                _log.info("The function dataframe is getting created")
                _df: DataFrame = func(*args, **kwargs)
                table_name: str = self._context.get_table_name

                rules = [row.asDict() for row in self.rules_df.collect()]
                self._check_invalid_rules(_df, rules)

                _input_count = _df.count() if not _df.isStreaming else 0
                _log.info(f"data frame input record count: {_input_count}")
                _output_count: int = 0
                _error_count: int = 0
                failed_ignored_row_dq_res: List[Dict[str, Any]] = []
                _row_dq_df: DataFrame = _df
                _ignore_rules_result: List[Optional[List[Dict[str, Any]]]] = []

                self._init_default_values(_input_count, expectations)
                _log.info(f"Spark Expectations run id for this run: {self._context.get_run_id}")

                if isinstance(_df, DataFrame):  # type: ignore
                    _log.info("The function dataframe is created")
                    self._context.set_table_name(table_name)
                    if write_to_temp_table:
                        _df = self._use_temp_table(table_name, _df)

                    func_process = self._process.execute_dq_process(
                        _context=self._context,
                        _actions=self.actions,
                        _writer=self._writer,
                        _notification=self._notification,
                        expectations=expectations,
                        _input_count=_input_count,
                    )

                    self._check_streaming_agg_query_dq(_df, _source_agg_dq, _source_query_dq)

                    if _source_agg_dq is True and not _df.isStreaming:
                        self._run_source_agg_dq_batch(_df, func_process)

                    if _source_query_dq is True and not _df.isStreaming:
                        self._run_source_query_dq_batch(_df, func_process)

                    if _row_dq is True:                            
                        _row_dq_df, _error_count, _output_count = self._run_row_dq(_df, func_process, _target_table_view)
                        failed_ignored_row_dq_res = self._call_row_dq_notifications()
                        _log.info("ended processing data quality rules for row level expectations")

                    if _row_dq is True and _target_agg_dq is True and not _df.isStreaming:
                        self._run_target_agg_dq_batch(func_process, _row_dq_df, _error_count, _output_count)

                    if _row_dq is True and _target_query_dq is True and not _df.isStreaming:
                        self._run_target_query_dq_batch(func_process, _row_dq_df, _target_table_view, _error_count, _output_count)

                    flattened_ignore_rules_result = self._check_ignore_rules_result(failed_ignored_row_dq_res, _ignore_rules_result)
                    if flattened_ignore_rules_result:
                        self._notification.notify_on_ignore_rules(flattened_ignore_rules_result)

                    streaming_query = None
                    if write_to_table:
                        _log.info("Writing into the final table started")
                        streaming_query = self._writer.save_df_as_table(
                            _row_dq_df,
                            f"{table_name}",
                            self._context.get_target_and_error_table_writer_config,
                        )
                        _log.info("Writing into the final table ended")

                else:
                    raise SparkExpectationsDataframeNotReturnedException(
                        "error occurred while processing spark "
                        "expectations due to given dataframe is not type of dataframe"
                    )

                return streaming_query if streaming_query is not None else _row_dq_df

            except Exception as e:
                raise SparkExpectationsMiscException(f"error occurred while processing spark expectations {e}")

        return wrapper

    return _except