Bases: SparkExpectationsNotification
This class implements/supports functionality to send PagerDuty notifications
function to create a PagerDuty incident
Args:
_context: SparkExpectationsContext class object
_config_args: dict which contains message(str) and other related configurations for PD
Returns: None
Source code in spark_expectations/notifications/plugins/pagerduty.py
| @spark_expectations_notification_impl
def send_notification(
self,
_context: SparkExpectationsContext,
_config_args: Dict[str, Union[str, bool]],
) -> None:
"""
function to create a PagerDuty incident
Args:
_context: SparkExpectationsContext class object
_config_args: dict which contains message(str) and other related configurations for PD
Returns: None
"""
try:
if _context.get_enable_pagerduty is True:
message = _config_args.get("message")
# Only send PagerDuty notifications for failure scenarios
if not self._is_failure_notification(str(message)):
_log.info("PagerDuty notification skipped - not a failure scenario")
return
if not _context.get_pagerduty_integration_key:
self._get_pd_integration_key(_context)
# Generate a deduplication key to consolidate multiple incidents into one
# Using table name and product ID to ensure same failure creates only one incident
table_name = _context.get_table_name or "unknown_table"
product_id = _context.product_id or "unknown_product"
dedup_key = f"spark_expectations_{product_id}_{table_name}_failure"
# Sending request to PagerDuty Events API v2 > https://developer.pagerduty.com/docs/send-alert-event
# Severity Levels can be: critical, error, warning, or info
payload = {
"routing_key": _context.get_pagerduty_integration_key,
"dedup_key": dedup_key,
"event_action": "trigger",
"payload": {
"summary": message,
"source": "Spark Expectations",
"severity": "error",
},
}
headers = {
"Content-Type": "application/json",
}
response = requests.post(
_context.get_pagerduty_webhook_url,
json=payload,
headers=headers,
timeout=10,
)
# Check the response for success or failure
if response.status_code == 202:
_log.info("PagerDuty notification sent successfully!")
else:
raise SparkExpectationsPagerDutyException(
f"Failed to send PagerDuty notification. Status code: {response.status_code}, Response: {response.text}"
)
except Exception as e:
raise SparkExpectationsPagerDutyException(f"Error sending PagerDuty notification: {str(e)}")
|