function to send email notification for requested mail id's
Args:
_context: object of SparkExpectationsContext
_config_args: dict(which consists of: receiver mail(str), subject: subject of
the mail(str) and body: body of the mail(str)
Returns:
Source code in spark_expectations/notifications/plugins/email.py
| @spark_expectations_notification_impl
def send_notification(
self,
_context: SparkExpectationsContext,
_config_args: Dict[Union[str], Union[str, bool]],
) -> None:
"""
function to send email notification for requested mail id's
Args:
_context: object of SparkExpectationsContext
_config_args: dict(which consists of: receiver mail(str), subject: subject of
the mail(str) and body: body of the mail(str)
Returns:
"""
try:
if _context.get_enable_mail is True:
msg = MIMEMultipart()
msg["From"] = _context.get_mail_from
msg["To"] = _context.get_to_mail
msg["Subject"] = _context.get_mail_subject
# body = _config_args.get('mail_body')
mail_content = f"""{_config_args.get("message")}"""
# Check if the content is HTML
if re.search(r"<[a-z][\s\S]*>", mail_content, re.IGNORECASE):
content_type = "html"
else:
content_type = "plain"
msg.attach(MIMEText(mail_content, content_type))
# mailhost.com
server = smtplib.SMTP(
_context.get_mail_smtp_server, _context.get_mail_smtp_port
)
server.starttls()
if _context.get_enable_smtp_server_auth:
self._get_smtp_password(_context, server)
text = msg.as_string()
server.sendmail(
_context.get_mail_from,
[email.strip() for email in _context.get_to_mail.split(",")],
text,
)
server.quit()
_log.info("email send successfully")
except Exception as e:
raise SparkExpectationsEmailException(
f"error occurred while sending email notification from spark expectations project {e}"
)
|