Skip to content

Classes

spark_expectations.core.context.SparkExpectationsContext dataclass

This class provides the context for SparkExpectations

Attributes

get_agg_dq_detailed_stats_status: bool property

This function returns whether to enable detailed result for Agg and Query dq is enabled or not Returns: Returns _enable_agg_dq_detailed_result(bool)

get_agg_dq_rule_type_name: str property

This function is used to get aggregation data quality rule type name

Returns:

Name Type Description
str str

Returns _agg_dq_rule_type_name"

get_cerberus_cred_path: str property

This functions implemented to return cerberus credentials path Returns:

get_cerberus_token: str property

This functions implemented to return cerberus token Returns:

get_cerberus_url: str property

This functions implemented to return cerberus url Returns:

get_client_id: Optional[str] property

This function helps in getting key / path for client id Returns: client id key / path in Optional[str]

get_config_file_path: str property

This function returns config file abs path Returns: str: Returns _config_file_path(str)

get_debugger_mode: bool property

This function returns a debugger Returns: bool: return debugger

get_detailed_stats_table_writer_config: dict property

This function returns stats table writer config Returns: dict: Returns detailed_stats_table_writer_config which in dict

get_dq_detailed_stats_table_name: str property

Get dq_stats_table_name to which the final stats of the dq job will be written into

Returns:

Name Type Description
str str

returns the dq_stats_table_name

get_dq_expectations: dict property

Get dq_expectations to which has rule infromation

Returns:

Name Type Description
str dict

returns the rules_df

get_dq_rules_params: dict property

This function returns params which are mapping in dq rules Returns: _dq_rules_params(dict)

get_dq_run_status: str property

This function is used to get data quality pipeline status

Returns:

Name Type Description
str str

Returns _dq_status"

get_dq_run_time: float property

This function implements time diff for dq run Returns: float: time in float

get_dq_stats_table_name: str property

Get dq_stats_table_name to which the final stats of the dq job will be written into

Returns:

Name Type Description
str str

returns the dq_stats_table_name

get_enable_mail: bool property

This function return whether mail notification to enable or not Returns: str: Returns _enable_mail(bool)

get_enable_slack: bool property

This function returns whether to enable slack notification or not Returns: Returns _enable_slack(bool)

get_enable_teams: bool property

This function returns whether to enable teams notification or not Returns: Returns _enable_teams(bool)

get_enable_zoom: bool property

Get whether Zoom notification is enabled.

Returns:

Name Type Description
bool bool

Whether Zoom notification is enabled or not.

get_env: Optional[str] property

functions returns running environment type Returns: str: Returns _env

get_error_count: int property

This functions return error count Returns: int: Returns _error_count(int)

get_error_drop_percentage: float property

This function returns error drop percentage percentage Returns: float: error drop percentage

get_error_drop_threshold: int property

This function return error threshold breach Returns: int: error threshold breach

get_error_percentage: float property

This function returns error percentage Returns: float: error percentage

get_error_table_name: str property

Get dq_stats_table_name to which the final stats of the dq job will be written into

Returns:

Name Type Description
str str

returns the dq_stats_table_name

get_final_agg_dq_result: Optional[List[Dict[str, str]]] property

This function return status of the final_agg_dq_result Returns: dict: Returns final_agg_dq_result which in list of dict with str(key) and str(value)

get_final_agg_dq_run_time: float property

This function implements time diff for final agg dq run Returns: float: time in float

get_final_agg_dq_status: str property

This function is used to get final aggregation data quality status

Returns:

Name Type Description
str str

Returns _final_agg_dq_status"

get_final_query_dq_result: Optional[List[Dict[str, str]]] property

This function return status of the final_query_dq_result Returns: dict: Returns final_query_dq_result which in list of dict with str(key) and str(value)

get_final_query_dq_run_time: float property

This function implements time diff for final query dq run Returns: float: time in float

get_final_query_dq_status: str property

This function is used to get final query dq data quality status

Returns:

Name Type Description
str str

Returns _final_query_dq_status"

get_final_table_name: str property

Get dq_stats_table_name to which the final stats of the dq job will be written into

Returns:

Name Type Description
str str

returns the dq_stats_table_name

get_input_count: int property

This function return input count Returns: int: Returns _input_count(int)

get_job_metadata: Optional[str] property

This function is used to get row data quality rule type name

Returns:

Name Type Description
str Optional[str]

Returns _row_dq_rule_type_name"

get_mail_from: str property

This function returns mail id to send email Returns:

get_mail_smtp_port: int property

This functions returns smtp port Returns: int: returns _mail_smtp_server port

get_mail_smtp_server: str property

This functions returns smtp server host Returns: str: returns _mail_smtp_server

get_mail_subject: str property

This function returns mail subject Returns: str: Returns _mail_subject(str)

get_notification_on_completion: bool property

This function returns notification on completion Returns: bool: Returns _notification_on_completion

get_notification_on_fail: bool property

This function returns notification on fail Returns: bool: Returns _notification_on_fail

get_notification_on_start: bool property

This function returns notification on start Returns: bool: Returns _notification_on_start

get_num_agg_dq_rules: dict property

This function returns number agg dq rules applied for batch run Returns: int: number of rules in int

get_num_dq_rules: int property

This function returns number dq rules applied for batch run Returns: int: number of rules in int

get_num_query_dq_rules: dict property

This function returns number query dq rules applied for batch run Returns: int: number of rules in int

get_num_row_dq_rules: int property

This function returns number row dq rules applied for batch run Returns: int: number of rules in int

get_output_count: int property

This function returns output count Returns: int: Returns _output(int)

get_output_percentage: float property

This function return output percentage Returns: float: output percentage

get_query_dq_detailed_stats_status: bool property

This function returns whether to enable detailed result for Agg and Query dq is enabled or not Returns: Returns _enable_query_dq_detailed_result(bool)

get_query_dq_output_custom_table_name: str property

Get query_dq_detailed_stats_status to which the final output of the query of the querydq will be written into

Returns:

Name Type Description
str str

returns the query_dq_output_custom_table_name

get_query_dq_rule_type_name: str property

This function is used to get query data quality rule type name

Returns:

Name Type Description
str str

Returns _query_dq_rule_type_name"

get_querydq_secondary_queries: dict property

This function gets row dq secondary queries Returns: dict: Returns querydq_secondary_queries

get_row_dq_end_time: datetime property

This function sets end time row dq computation Returns: None

get_row_dq_rule_type_name: str property

This function is used to get row data quality rule type name

Returns:

Name Type Description
str str

Returns _row_dq_rule_type_name"

get_row_dq_run_time: float property

This function implements time diff for row dq run Returns: float: time in float

get_row_dq_start_time: datetime property

This function sets start time row dq computation Returns: None

get_row_dq_status: str property

This function is used to get row data quality status

Returns:

Name Type Description
str str

Returns _row_dq_status"

get_rules_exceeds_threshold: Optional[List[dict]] property

This function returns error percentage for each rule

get_rules_execution_settings_config: dict property

This function returns stats table writer config Returns: dict: Returns detailed_stats_table_writer_config which in dict

get_run_date: str property

Get run_date for the instance of the spark-expectations class

Returns:

Name Type Description
str str

returns the run_date

get_run_date_name: str property

This function returns name for the run_date column Returns: str: name of run_date in str

get_run_date_time_name: str property

This function returns name for the run_date_time column Returns: str: name of run_date_time in str

get_run_id: str property

Get run_id for the instance of spark-expectations class

Returns:

Name Type Description
str str

returns the run_id

get_run_id_name: str property

This function returns name for the run_id column Returns: str: name of run_id in str

get_se_enable_error_table: bool property

This function returns whether to enable relational table or not Returns: Returns _se_enable_error_table(bool)

get_se_streaming_stats_dict: Dict[str, str] property

This function returns secret keys dict

get_se_streaming_stats_topic_name: str property

This function returns kafka topic name Returns: str: Returns _se_streaming_stats_topic_name

get_secret_type: Optional[str] property

This function helps in getting secret type Returns: secret type in Optional[str]

get_server_url_key: Optional[str] property

This function helps in getting key / path for kafka server url Returns: kafka server url key / path in Optional[str]

get_slack_webhook_url: str property

This function returns sack webhook url Returns: str: Returns _webhook_url(str)

get_source_agg_dq_detailed_stats: Optional[List[Tuple]] property

This function returns the detailed result for Agg and Query dq Returns: Returns _source_agg_dq_detailed_stats

get_source_agg_dq_result: Optional[List[Dict[str, str]]] property

This function return status of the source_agg_dq_result Returns: dict: Returns source_agg_dq_result which in list of dict with str(key) and str(value)

get_source_agg_dq_run_time: float property

This function implements time diff for source agg dq run Returns: float: time in float

get_source_agg_dq_status: str property

This function is used to get source aggregation data quality status

Returns:

Name Type Description
str str

Returns _source_agg_dq_status"

get_source_query_dq_detailed_stats: Optional[List[Tuple]] property

This function returns the detailed result for Agg and Query dq Returns: Returns _source_query_dq_detailed_stats

get_source_query_dq_output: Optional[List[dict]] property

This function gets row dq secondary queries Returns: dict: Returns source_query_dq_output

get_source_query_dq_result: Optional[List[Dict[str, str]]] property

This function return status of the source_query_dq_result Returns: dict: Returns source_query_dq_result which in list of dict with str(key) and str(value)

get_source_query_dq_run_time: float property

This function implements time diff for source query dq run Returns: float: time in float

get_source_query_dq_status: str property

This function is used to get source query data quality status

Returns:

Name Type Description
str str

Returns _source_query_dq_status"

get_stats_table_writer_config: dict property

This function returns stats table writer config Returns: dict: Returns stats_table_writer_config which in dict

get_success_percentage: float property

This function returns success percentage Returns: float: success percentage

get_summarized_row_dq_res: Optional[List[Dict[str, str]]] property

This function returns row dq summarized res Returns: list(dict): Returns summarized_row_dq_res which in list of dict with str(key) and str(value) of rule meta data

get_supported_df_query_dq: DataFrame property

This function returns the place holder dataframe for query check Returns: DataFrame: returns dataframe for query dq

get_table_name: str property

This function returns table name Returns: str: Returns _table_name(str)

get_target_agg_dq_detailed_stats: Optional[List[Tuple]] property

This function returns the detailed result for Agg and Query dq Returns: Returns _target_agg_dq_detailed_stats

get_target_and_error_table_writer_config: dict property

This function returns target and error table writer config Returns: dict: Returns target_and_error_table_writer_config which in dict

get_target_query_dq_detailed_stats: Optional[List[Tuple]] property

This function returns the detailed result for Agg and Query dq Returns: Returns _target_query_dq_detailed_stats

get_target_query_dq_output: Optional[List[dict]] property

This function gets row dq secondary queries Returns: dict: Returns target_query_dq_output

get_teams_webhook_url: str property

This function returns sack webhook url Returns: str: Returns _webhook_url(str)

get_to_mail: str property

This function returns list of mail id's Returns: str: Returns _mail_id(str)

get_token: Optional[str] property

This function helps in getting key / path for token Returns: token key / path in Optional[str]

get_token_endpoint_url: Optional[str] property

This function helps in getting key / path for end point url Returns: end point url key / path in Optional[str]

get_topic_name: Optional[str] property

This function helps in getting key / path for topic name Returns: topic name key / path in Optional[str]

get_zoom_token: str property

Get the Zoom token.

Returns:

Name Type Description
str str

The Zoom token.

get_zoom_webhook_url: str property

Get the Zoom webhook URL.

Returns:

Name Type Description
str str

The Zoom webhook URL.

product_id: str instance-attribute

spark: SparkSession instance-attribute

Functions

get_time_diff(start_time: Optional[datetime], end_time: Optional[datetime]) -> float

This function implements time diff Args: start_time: end_time:

Returns:

Source code in spark_expectations/core/context.py
def get_time_diff(
    self, start_time: Optional[datetime], end_time: Optional[datetime]
) -> float:
    """
    This function implements time diff
    Args:
        start_time:
        end_time:

    Returns:

    """
    if start_time and end_time:
        time_diff = end_time - start_time

        return round(float(time_diff.total_seconds()), 1)
    else:
        return 0.0

print_dataframe_with_debugger(df: DataFrame) -> None

This function has a debugger that can print out the DataFrame Returns:

Source code in spark_expectations/core/context.py
def print_dataframe_with_debugger(self, df: DataFrame) -> None:
    """
    This function has a debugger that can print out the DataFrame
    Returns:

    """
    if self.get_debugger_mode:
        df.show(truncate=False)

reset_num_agg_dq_rules() -> None

This function used to reset the_num_agg_dq_rules Returns: None

Source code in spark_expectations/core/context.py
def reset_num_agg_dq_rules(self) -> None:
    """
    This function used to reset the_num_agg_dq_rules
    Returns:
        None

    """
    self._num_agg_dq_rules = {
        "num_agg_dq_rules": 0,
        "num_source_agg_dq_rules": 0,
        "num_final_agg_dq_rules": 0,
    }

reset_num_dq_rules() -> None

This function used to reset the _num_dq_rules Returns: None

Source code in spark_expectations/core/context.py
def reset_num_dq_rules(self) -> None:
    """
    This function used to reset the _num_dq_rules
    Returns:
        None

    """
    self._num_dq_rules = 0

reset_num_query_dq_rules() -> None

This function used to rest the _num_query_dq_rules Returns: None

Source code in spark_expectations/core/context.py
def reset_num_query_dq_rules(self) -> None:
    """
    This function used to rest the _num_query_dq_rules
    Returns:
        None

    """
    self._num_query_dq_rules = {
        "num_query_dq_rules": 0,
        "num_source_query_dq_rules": 0,
        "num_final_query_dq_rules": 0,
    }

reset_num_row_dq_rules() -> None

This function used to reset the _num_row_dq_rules Returns: None

Source code in spark_expectations/core/context.py
def reset_num_row_dq_rules(self) -> None:
    """
    This function used to reset the _num_row_dq_rules
    Returns:
        None

    """

    self._num_row_dq_rules = 0  # pragma: no cover

set_agg_dq_detailed_stats_status(agg_dq_detailed_result_status: bool) -> None

Parameters:

Name Type Description Default
_enable_agg_dq_detailed_result
required

Returns:

Source code in spark_expectations/core/context.py
def set_agg_dq_detailed_stats_status(
    self, agg_dq_detailed_result_status: bool
) -> None:
    """
    Args:
        _enable_agg_dq_detailed_result:
    Returns:
    """
    self._enable_agg_dq_detailed_result = bool(agg_dq_detailed_result_status)

set_debugger_mode(debugger_mode: bool) -> None

This function sets debugger mode Returns:

Source code in spark_expectations/core/context.py
def set_debugger_mode(self, debugger_mode: bool) -> None:
    """
    This function sets debugger mode
    Returns:

    """
    self._debugger_mode = debugger_mode

set_detailed_stats_table_writer_config(config: dict) -> None

This function sets stats table writer config Args: config: dict Returns: None

Source code in spark_expectations/core/context.py
def set_detailed_stats_table_writer_config(self, config: dict) -> None:
    """
    This function sets stats table writer config
    Args:
        config: dict
    Returns: None
    """
    self._stats_table_writer_config = config

set_dq_detailed_stats_table_name(dq_detailed_stats_table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_dq_detailed_stats_table_name(
    self, dq_detailed_stats_table_name: str
) -> None:
    self._dq_detailed_stats_table_name = dq_detailed_stats_table_name

set_dq_end_time() -> None

This function sets end time dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_dq_end_time(self) -> None:
    """
    This function sets end time dq computation
    Returns:
        None
    """
    self._dq_end_time = datetime.now()

set_dq_expectations(dq_expectations: dict) -> None

Source code in spark_expectations/core/context.py
def set_dq_expectations(self, dq_expectations: dict) -> None:
    self._dq_expectations = dq_expectations

set_dq_rules_params(_dq_rules_params: dict) -> None

This function set params for dq rules Args: _se_dq_rules_params:

Returns:

Source code in spark_expectations/core/context.py
def set_dq_rules_params(self, _dq_rules_params: dict) -> None:
    """
    This function set params for dq rules
    Args:
        _se_dq_rules_params:

    Returns:

    """
    self._dq_rules_params = _dq_rules_params

set_dq_run_status(dq_run_status: str = 'Failed') -> None

Source code in spark_expectations/core/context.py
def set_dq_run_status(self, dq_run_status: str = "Failed") -> None:
    self._dq_run_status = dq_run_status

set_dq_start_time() -> None

This function sets start time dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_dq_start_time(self) -> None:
    """
    This function sets start time dq computation
    Returns:
        None
    """
    self._dq_start_time = datetime.now()

set_dq_stats_table_name(dq_stats_table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_dq_stats_table_name(self, dq_stats_table_name: str) -> None:
    self._dq_stats_table_name = dq_stats_table_name

set_enable_mail(enable_mail: bool) -> None

Source code in spark_expectations/core/context.py
def set_enable_mail(self, enable_mail: bool) -> None:
    self._enable_mail = bool(enable_mail)

set_enable_slack(enable_slack: bool) -> None

Parameters:

Name Type Description Default
enable_slack bool
required

Returns:

Source code in spark_expectations/core/context.py
def set_enable_slack(self, enable_slack: bool) -> None:
    """

    Args:
        enable_slack:

    Returns:

    """
    self._enable_slack = enable_slack

set_enable_teams(enable_teams: bool) -> None

Parameters:

Name Type Description Default
enable_teams bool
required

Returns:

Source code in spark_expectations/core/context.py
def set_enable_teams(self, enable_teams: bool) -> None:
    """

    Args:
        enable_teams:

    Returns:

    """
    self._enable_teams = enable_teams

set_enable_zoom(enable_zoom: bool) -> None

Set whether to enable Zoom notification and its token.

Parameters:

Name Type Description Default
enable_zoom bool

Whether to enable Zoom notification or not.

required
Source code in spark_expectations/core/context.py
def set_enable_zoom(self, enable_zoom: bool) -> None:
    """
    Set whether to enable Zoom notification and its token.

    Args:
        enable_zoom (bool): Whether to enable Zoom notification or not.
    """
    self._enable_zoom = enable_zoom

set_end_time_when_dq_job_fails() -> None

function used to set end time when job fails in any one of the stages by using start time Returns:

Source code in spark_expectations/core/context.py
def set_end_time_when_dq_job_fails(self) -> None:
    """
    function used to set end time when job fails in any one of the stages by using start time
    Returns:
    """
    if self._source_agg_dq_start_time and self._source_agg_dq_end_time is None:
        self.set_source_agg_dq_end_time()
    elif (
        self._source_query_dq_start_time and self._source_query_dq_end_time is None
    ):
        self.set_source_query_dq_end_time()
    elif self._row_dq_start_time and self._row_dq_end_time is None:
        self.set_row_dq_end_time()
    elif self._final_agg_dq_start_time and self._final_agg_dq_end_time is None:
        self.set_final_agg_dq_end_time()
    elif self._final_query_dq_start_time and self._final_query_dq_end_time is None:
        self.set_final_query_dq_end_time()

set_env(env: Optional[str]) -> None

Parameters:

Name Type Description Default
env Optional[str]

which accepts env type

required

Returns:

Type Description
None

None

Source code in spark_expectations/core/context.py
def set_env(self, env: Optional[str]) -> None:
    """
    Args:
        env: which accepts env type

    Returns:
        None

    """
    self._env = env

set_error_count(error_count: int = 0) -> None

Source code in spark_expectations/core/context.py
def set_error_count(self, error_count: int = 0) -> None:
    self._error_count = error_count

set_error_drop_threshold(error_drop_threshold: int) -> None

Source code in spark_expectations/core/context.py
def set_error_drop_threshold(self, error_drop_threshold: int) -> None:
    self._error_drop_threshold = error_drop_threshold

set_error_table_name(error_table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_error_table_name(self, error_table_name: str) -> None:
    self._error_table_name = error_table_name

set_final_agg_dq_end_time() -> None

This function sets end time final agg dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_final_agg_dq_end_time(self) -> None:
    """
    This function sets end time final agg dq computation
    Returns:
        None
    """
    self._final_agg_dq_end_time = datetime.now()

set_final_agg_dq_result(final_agg_dq_result: Optional[List[Dict[str, str]]] = None) -> None

Source code in spark_expectations/core/context.py
def set_final_agg_dq_result(
    self, final_agg_dq_result: Optional[List[Dict[str, str]]] = None
) -> None:
    self._final_agg_dq_result = final_agg_dq_result

set_final_agg_dq_start_time() -> None

This function sets start time final agg dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_final_agg_dq_start_time(self) -> None:
    """
    This function sets start time final agg dq computation
    Returns:
    None

    """
    self._final_agg_dq_start_time = datetime.now()

set_final_agg_dq_status(final_agg_dq_status: str = 'Skipped') -> None

Source code in spark_expectations/core/context.py
def set_final_agg_dq_status(self, final_agg_dq_status: str = "Skipped") -> None:
    self._final_agg_dq_status = final_agg_dq_status

set_final_query_dq_end_time() -> None

This function sets end time final query dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_final_query_dq_end_time(self) -> None:
    """
    This function sets end time final query dq computation
    Returns:
        None
    """
    self._final_query_dq_end_time = datetime.now()

set_final_query_dq_result(final_query_dq_result: Optional[List[Dict[str, str]]] = None) -> None

Source code in spark_expectations/core/context.py
def set_final_query_dq_result(
    self, final_query_dq_result: Optional[List[Dict[str, str]]] = None
) -> None:
    self._final_query_dq_result = final_query_dq_result

set_final_query_dq_start_time() -> None

This function sets start time final query dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_final_query_dq_start_time(self) -> None:
    """
    This function sets start time final query dq computation
    Returns:
        None
    """
    self._final_query_dq_start_time = datetime.now()

set_final_query_dq_status(final_query_dq_status: str = 'Skipped') -> None

Source code in spark_expectations/core/context.py
def set_final_query_dq_status(self, final_query_dq_status: str = "Skipped") -> None:
    self._final_query_dq_status = final_query_dq_status

set_final_table_name(final_table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_final_table_name(self, final_table_name: str) -> None:
    self._final_table_name = final_table_name

set_input_count(input_count: int = 0) -> None

Source code in spark_expectations/core/context.py
def set_input_count(self, input_count: int = 0) -> None:
    self._input_count = input_count

set_job_metadata(job_metadata: Optional[str] = None) -> None

This function is used to set the job_metadata

Returns:

Type Description
None

None

Source code in spark_expectations/core/context.py
def set_job_metadata(self, job_metadata: Optional[str] = None) -> None:
    """
    This function is used to set the job_metadata

    Returns:
        None

    """
    self._job_metadata = job_metadata

set_mail_from(mail_from: str) -> None

Source code in spark_expectations/core/context.py
def set_mail_from(self, mail_from: str) -> None:
    self._mail_from = mail_from

set_mail_smtp_port(mail_smtp_port: int) -> None

Source code in spark_expectations/core/context.py
def set_mail_smtp_port(self, mail_smtp_port: int) -> None:
    self._mail_smtp_port = mail_smtp_port

set_mail_smtp_server(mail_smtp_server: str) -> None

Source code in spark_expectations/core/context.py
def set_mail_smtp_server(self, mail_smtp_server: str) -> None:
    self._mail_smtp_server = mail_smtp_server

set_mail_subject(mail_subject: str) -> None

Source code in spark_expectations/core/context.py
def set_mail_subject(self, mail_subject: str) -> None:
    self._mail_subject = mail_subject

set_notification_on_completion(notification_on_completion: bool) -> None

Source code in spark_expectations/core/context.py
def set_notification_on_completion(self, notification_on_completion: bool) -> None:
    self._notification_on_completion = notification_on_completion

set_notification_on_fail(notification_on_fail: bool) -> None

Source code in spark_expectations/core/context.py
def set_notification_on_fail(self, notification_on_fail: bool) -> None:
    self._notification_on_fail = notification_on_fail

set_notification_on_start(notification_on_start: bool) -> None

Source code in spark_expectations/core/context.py
def set_notification_on_start(self, notification_on_start: bool) -> None:
    self._notification_on_start = notification_on_start

set_num_agg_dq_rules(source_agg_enabled: bool = False, final_agg_enabled: bool = False) -> None

This function sets number of applied agg dq rules for batch run source_agg_enabled: Marked True when agg rules set for source, by default False final_agg_enabled: Marked True when agg rules set for final, by default False Returns: None

Source code in spark_expectations/core/context.py
def set_num_agg_dq_rules(
    self, source_agg_enabled: bool = False, final_agg_enabled: bool = False
) -> None:
    """
    This function sets number of applied agg dq rules for batch run
    source_agg_enabled: Marked True when agg rules set for source, by default False
    final_agg_enabled: Marked True when agg rules set for final, by default False
    Returns:
        None
    """

    self._num_agg_dq_rules["num_agg_dq_rules"] += 1
    self._num_dq_rules += 1

    if source_agg_enabled:
        self._num_agg_dq_rules["num_source_agg_dq_rules"] += 1
    if final_agg_enabled:
        self._num_agg_dq_rules["num_final_agg_dq_rules"] += 1

set_num_query_dq_rules(source_query_enabled: bool = False, final_query_enabled: bool = False) -> None

This function sets number of applied query dq rules for batch run source_query_enabled: Marked True when query rules set for source, by default False final_query_enabled: Marked True when query rules set for final, by default False Returns: None

Source code in spark_expectations/core/context.py
def set_num_query_dq_rules(
    self, source_query_enabled: bool = False, final_query_enabled: bool = False
) -> None:
    """
    This function sets number of applied query dq rules for batch run
    source_query_enabled: Marked True when query rules set for source, by default False
    final_query_enabled: Marked True when query rules set for final, by default False
    Returns:
        None
    """

    self._num_query_dq_rules["num_query_dq_rules"] += 1
    self._num_dq_rules += 1

    if source_query_enabled:
        self._num_query_dq_rules["num_source_query_dq_rules"] += 1
    if final_query_enabled:
        self._num_query_dq_rules["num_final_query_dq_rules"] += 1

set_num_row_dq_rules() -> None

This function sets number of applied row dq rules for batch run Returns: None

Source code in spark_expectations/core/context.py
def set_num_row_dq_rules(self) -> None:
    """
    This function sets number of applied row dq rules for batch run
    Returns:
        None

    """
    self._num_row_dq_rules += 1
    self._num_dq_rules += 1

set_output_count(output_count: int = 0) -> None

Source code in spark_expectations/core/context.py
def set_output_count(self, output_count: int = 0) -> None:
    self._output_count = output_count

set_query_dq_detailed_stats_status(query_dq_detailed_result_status: bool) -> None

Parameters:

Name Type Description Default
_enable_query_dq_detailed_result
required

Returns:

Source code in spark_expectations/core/context.py
def set_query_dq_detailed_stats_status(
    self, query_dq_detailed_result_status: bool
) -> None:
    """
    Args:
        _enable_query_dq_detailed_result:
    Returns:
    """
    self._enable_query_dq_detailed_result = bool(query_dq_detailed_result_status)

set_query_dq_output_custom_table_name(query_dq_output_custom_table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_query_dq_output_custom_table_name(
    self, query_dq_output_custom_table_name: str
) -> None:
    self._query_dq_output_custom_table_name = query_dq_output_custom_table_name

set_querydq_secondary_queries(querydq_secondary_queries: dict) -> None

This function sets row dq secondary queries Args: querydq_secondary_queries: dict Returns: None

Source code in spark_expectations/core/context.py
def set_querydq_secondary_queries(self, querydq_secondary_queries: dict) -> None:
    """
    This function sets row dq secondary queries
    Args:
        querydq_secondary_queries: dict
    Returns: None
    """
    self._querydq_secondary_queries = querydq_secondary_queries

set_row_dq_end_time() -> None

This function sets end time row dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_row_dq_end_time(self) -> None:
    """
    This function sets end time row dq computation
    Returns:
        None
    """
    self._row_dq_end_time = datetime.now()

set_row_dq_start_time() -> None

This function sets start time row dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_row_dq_start_time(self) -> None:
    """
    This function sets start time row dq computation
    Returns:
        None
    """
    self._row_dq_start_time = datetime.now()

set_row_dq_status(row_dq_status: str = 'Skipped') -> None

Source code in spark_expectations/core/context.py
def set_row_dq_status(self, row_dq_status: str = "Skipped") -> None:
    self._row_dq_status = row_dq_status

set_rules_exceeds_threshold(rules: Optional[List[dict]] = None) -> None

This function implements error percentage for each rule type

Source code in spark_expectations/core/context.py
def set_rules_exceeds_threshold(self, rules: Optional[List[dict]] = None) -> None:
    """
    This function implements error percentage for each rule type
    """
    self._rules_error_per = rules

set_rules_execution_settings_config(config: dict) -> None

This function sets stats table writer config Args: config: dict Returns: None

Source code in spark_expectations/core/context.py
def set_rules_execution_settings_config(self, config: dict) -> None:
    """
    This function sets stats table writer config
    Args:
        config: dict
    Returns: None
    """
    self._rules_execution_settings_config = config

set_run_date() -> str staticmethod

This function is used to generate the current datatime in UTC

Returns:

Name Type Description
str str

Returns the current utc datatime in the format - "%Y-%m-%d %H:%M:%S"

Source code in spark_expectations/core/context.py
@staticmethod
def set_run_date() -> str:
    """
    This function is used to generate the current datatime in UTC

    Returns:
        str: Returns the current utc datatime in the format - "%Y-%m-%d %H:%M:%S"

    """
    current_datetime: datetime = datetime.now(timezone.utc)
    return current_datetime.replace(tzinfo=timezone.utc).strftime(
        "%Y-%m-%d %H:%M:%S"
    )

set_se_enable_error_table(_enable_error_table: bool) -> None

Parameters:

Name Type Description Default
_se_enable_error_table
required

Returns:

Source code in spark_expectations/core/context.py
def set_se_enable_error_table(self, _enable_error_table: bool) -> None:
    """

    Args:
        _se_enable_error_table:

    Returns:

    """
    self._se_enable_error_table = _enable_error_table

set_se_streaming_stats_dict(se_streaming_stats_dict: Dict[str, str]) -> None

This function helps to set secret keys dict

Source code in spark_expectations/core/context.py
def set_se_streaming_stats_dict(
    self, se_streaming_stats_dict: Dict[str, str]
) -> None:
    """
    This function helps to set secret keys dict"""
    self._se_streaming_stats_dict = se_streaming_stats_dict

set_se_streaming_stats_topic_name(se_streaming_stats_topic_name: str) -> None

Source code in spark_expectations/core/context.py
def set_se_streaming_stats_topic_name(
    self, se_streaming_stats_topic_name: str
) -> None:
    self._se_streaming_stats_topic_name = se_streaming_stats_topic_name

set_slack_webhook_url(slack_webhook_url: str) -> None

Source code in spark_expectations/core/context.py
def set_slack_webhook_url(self, slack_webhook_url: str) -> None:
    self._slack_webhook_url = slack_webhook_url

set_source_agg_dq_detailed_stats(source_agg_dq_detailed_stats: Optional[List[Tuple]] = None) -> None

Parameters:

Name Type Description Default
_source_agg_dq_detailed_stats
required

Returns:

Source code in spark_expectations/core/context.py
def set_source_agg_dq_detailed_stats(
    self, source_agg_dq_detailed_stats: Optional[List[Tuple]] = None
) -> None:
    """
    Args:
        _source_agg_dq_detailed_stats:
    Returns:
    """
    self._source_agg_dq_detailed_stats = source_agg_dq_detailed_stats

set_source_agg_dq_end_time() -> None

This function sets end time source agg dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_source_agg_dq_end_time(self) -> None:
    """
    This function sets end time source agg dq computation
    Returns:
        None

    """
    self._source_agg_dq_end_time = datetime.now()

set_source_agg_dq_result(source_agg_dq_result: Optional[List[Dict[str, str]]] = None) -> None

Source code in spark_expectations/core/context.py
def set_source_agg_dq_result(
    self, source_agg_dq_result: Optional[List[Dict[str, str]]] = None
) -> None:
    self._source_agg_dq_result = source_agg_dq_result

set_source_agg_dq_start_time() -> None

This function sets start time source agg dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_source_agg_dq_start_time(self) -> None:
    """
    This function sets start time source agg dq computation
    Returns:
         None

    """
    self._source_agg_dq_start_time = datetime.now()

set_source_agg_dq_status(source_agg_dq_status: str = 'Skipped') -> None

Source code in spark_expectations/core/context.py
def set_source_agg_dq_status(self, source_agg_dq_status: str = "Skipped") -> None:
    self._source_agg_dq_status = source_agg_dq_status

set_source_query_dq_detailed_stats(source_query_dq_detailed_stats: Optional[List[Tuple]] = None) -> None

Parameters:

Name Type Description Default
_source_query_dq_detailed_stats
required

Returns:

Source code in spark_expectations/core/context.py
def set_source_query_dq_detailed_stats(
    self, source_query_dq_detailed_stats: Optional[List[Tuple]] = None
) -> None:
    """
    Args:
        _source_query_dq_detailed_stats:
    Returns:
    """
    self._source_query_dq_detailed_stats = source_query_dq_detailed_stats

set_source_query_dq_end_time() -> None

This function sets end time source query dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_source_query_dq_end_time(self) -> None:
    """
    This function sets end time source query dq computation
    Returns:
        None
    """
    self._source_query_dq_end_time = datetime.now()

set_source_query_dq_output(source_query_dq_output: Optional[List[dict]] = None) -> None

This function sets row dq secondary queries Args: source_query_dq_output: List[dict] Returns: None

Source code in spark_expectations/core/context.py
def set_source_query_dq_output(
    self, source_query_dq_output: Optional[List[dict]] = None
) -> None:
    """
    This function sets row dq secondary queries
    Args:
        source_query_dq_output: List[dict]
    Returns: None
    """
    self._source_query_dq_output = source_query_dq_output

set_source_query_dq_result(source_query_dq_result: Optional[List[Dict[str, str]]] = None) -> None

Source code in spark_expectations/core/context.py
def set_source_query_dq_result(
    self, source_query_dq_result: Optional[List[Dict[str, str]]] = None
) -> None:
    self._source_query_dq_result = source_query_dq_result

set_source_query_dq_start_time() -> None

This function sets start time source query dq computation Returns: None

Source code in spark_expectations/core/context.py
def set_source_query_dq_start_time(self) -> None:
    """
    This function sets start time source query dq computation
    Returns:
        None
    """
    self._source_query_dq_start_time = datetime.now()

set_source_query_dq_status(source_query_dq_status: str = 'Skipped') -> None

Source code in spark_expectations/core/context.py
def set_source_query_dq_status(
    self, source_query_dq_status: str = "Skipped"
) -> None:
    self._source_query_dq_status = source_query_dq_status

set_stats_table_writer_config(config: dict) -> None

This function sets stats table writer config Args: config: dict Returns: None

Source code in spark_expectations/core/context.py
def set_stats_table_writer_config(self, config: dict) -> None:
    """
    This function sets stats table writer config
    Args:
        config: dict
    Returns: None
    """
    self._stats_table_writer_config = config

set_summarized_row_dq_res(summarized_row_dq_res: Optional[List[Dict[str, str]]] = None) -> None

This function implements or supports to set row dq summarized res Args: summarized_row_dq_res: list(dict) Returns: None

Source code in spark_expectations/core/context.py
def set_summarized_row_dq_res(
    self, summarized_row_dq_res: Optional[List[Dict[str, str]]] = None
) -> None:
    """
    This function implements or supports to set row dq summarized res
    Args:
        summarized_row_dq_res: list(dict)
    Returns: None

    """
    self._summarized_row_dq_res = summarized_row_dq_res

set_supported_df_query_dq() -> DataFrame

Source code in spark_expectations/core/context.py
def set_supported_df_query_dq(self) -> DataFrame:
    return self.spark.createDataFrame(
        [
            {
                "spark_expectations_query_check": "supported_place_holder_dataset_to_run_query_check"
            }
        ]
    )

set_table_name(table_name: str) -> None

Source code in spark_expectations/core/context.py
def set_table_name(self, table_name: str) -> None:
    self._table_name = table_name

set_target_agg_dq_detailed_stats(target_agg_dq_detailed_stats: Optional[List[Tuple]] = None) -> None

Parameters:

Name Type Description Default
_target_agg_dq_detailed_stats
required

Returns:

Source code in spark_expectations/core/context.py
def set_target_agg_dq_detailed_stats(
    self, target_agg_dq_detailed_stats: Optional[List[Tuple]] = None
) -> None:
    """
    Args:
        _target_agg_dq_detailed_stats:
    Returns:
    """
    self._target_agg_dq_detailed_stats = target_agg_dq_detailed_stats

set_target_and_error_table_writer_config(config: dict) -> None

This function sets target and error table writer config Args: config: dict Returns: None

Source code in spark_expectations/core/context.py
def set_target_and_error_table_writer_config(self, config: dict) -> None:
    """
    This function sets target and error table writer config
    Args:
        config: dict
    Returns: None

    """
    self._target_and_error_table_writer_config = config

set_target_query_dq_detailed_stats(target_query_dq_detailed_stats: Optional[List[Tuple]] = None) -> None

Parameters:

Name Type Description Default
_target_query_dq_detailed_stats
required

Returns:

Source code in spark_expectations/core/context.py
def set_target_query_dq_detailed_stats(
    self, target_query_dq_detailed_stats: Optional[List[Tuple]] = None
) -> None:
    """
    Args:
        _target_query_dq_detailed_stats:
    Returns:
    """
    self._target_query_dq_detailed_stats = target_query_dq_detailed_stats

set_target_query_dq_output(target_query_dq_output: Optional[List[dict]] = None) -> None

This function sets row dq secondary queries Args: target_query_dq_output: List[dict] Returns: None

Source code in spark_expectations/core/context.py
def set_target_query_dq_output(
    self, target_query_dq_output: Optional[List[dict]] = None
) -> None:
    """
    This function sets row dq secondary queries
    Args:
        target_query_dq_output: List[dict]
    Returns: None
    """
    self._target_query_dq_output = target_query_dq_output

set_teams_webhook_url(teams_webhook_url: str) -> None

Source code in spark_expectations/core/context.py
def set_teams_webhook_url(self, teams_webhook_url: str) -> None:
    self._teams_webhook_url = teams_webhook_url

set_to_mail(to_mail: str) -> None

Source code in spark_expectations/core/context.py
def set_to_mail(self, to_mail: str) -> None:
    self._to_mail = to_mail

set_zoom_token(zoom_token: str) -> None

Set the Zoom webhook token.

Parameters:

Name Type Description Default
zoom_token str

The token for Zoom notification.

required
Source code in spark_expectations/core/context.py
def set_zoom_token(self, zoom_token: str) -> None:
    """
    Set the Zoom webhook token.

    Args:
        zoom_token (str): The token for Zoom notification.
    """
    self._zoom_token = zoom_token

set_zoom_webhook_url(zoom_webhook_url: str) -> None

Set the Zoom webhook URL.

Parameters:

Name Type Description Default
zoom_webhook_url str

The webhook URL for Zoom notification.

required
Source code in spark_expectations/core/context.py
def set_zoom_webhook_url(self, zoom_webhook_url: str) -> None:
    """
    Set the Zoom webhook URL.

    Args:
        zoom_webhook_url (str): The webhook URL for Zoom notification.
    """
    self._zoom_webhook_url = zoom_webhook_url