Skip to content

Attributes

Classes

spark_expectations.notifications.plugins.pagerduty.SparkExpectationsPagerDutyPluginImpl

Bases: SparkExpectationsNotification

This class implements/supports functionality to send PagerDuty notifications

Functions

send_notification(_context: SparkExpectationsContext, _config_args: Dict[str, Union[str, bool]]) -> None

function to create a PagerDuty incident Args: _context: SparkExpectationsContext class object _config_args: dict which contains message(str) and other related configurations for PD Returns: None

Source code in spark_expectations/notifications/plugins/pagerduty.py
@spark_expectations_notification_impl
def send_notification(
    self,
    _context: SparkExpectationsContext,
    _config_args: Dict[str, Union[str, bool]],
) -> None:
    """
    function to create a PagerDuty incident
    Args:
        _context: SparkExpectationsContext class object
        _config_args: dict which contains message(str) and other related configurations for PD
    Returns: None
    """
    try:
        if _context.get_enable_pagerduty is True:
            message = _config_args.get("message")

            # Only send PagerDuty notifications for failure scenarios
            if not self._is_failure_notification(str(message)):
                _log.info("PagerDuty notification skipped - not a failure scenario")
                return

            if not _context.get_pagerduty_integration_key:
                self._get_pd_integration_key(_context)

            # Generate a deduplication key to consolidate multiple incidents into one
            # Using table name and product ID to ensure same failure creates only one incident
            table_name = _context.get_table_name or "unknown_table"
            product_id = _context.product_id or "unknown_product"
            dedup_key = f"spark_expectations_{product_id}_{table_name}_failure"

            # Sending request to PagerDuty Events API v2 > https://developer.pagerduty.com/docs/send-alert-event
            # Severity Levels can be: critical, error, warning, or info
            payload = {
                "routing_key": _context.get_pagerduty_integration_key,
                "dedup_key": dedup_key,
                "event_action": "trigger",
                "payload": {
                    "summary": message,
                    "source": "Spark Expectations",
                    "severity": "error",
                },
            }
            headers = {
                "Content-Type": "application/json",
            }
            response = requests.post(
                _context.get_pagerduty_webhook_url,
                json=payload,
                headers=headers,
                timeout=10,
            )

            # Check the response for success or failure
            if response.status_code == 202:
                _log.info("PagerDuty notification sent successfully!")
            else:
                raise SparkExpectationsPagerDutyException(
                    f"Failed to send PagerDuty notification. Status code: {response.status_code}, Response: {response.text}"
                )
    except Exception as e:
        raise SparkExpectationsPagerDutyException(f"Error sending PagerDuty notification: {str(e)}")