亚马逊AWS官方博客

Amazon MWAA 实战分享 – 基于 Task 颗粒度的监控告警

服务及场景介绍

Amazon MWAA

Amazon MWAA(Amazon Managed Workflows for Apache Airflow)是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。

Amazon SNS

Amazon Simple Notification Service(Amazon SNS)是一项托管服务,提供从发布者向订阅者(也称为创建者和使用者)的消息传输。发布者通过将消息发送至主题与订阅者进行异步交流,主题是一个逻辑访问点和通信渠道。客户端可以订阅 SNS 主题并使用受支持的终端节点类型接收已发布的消息,例如 Amazon Kinesis Data Firehose、Amazon SQS、Amazon Lambda、HTTP、电子邮件、移动推送通知和移动短信(SMS)。

Amazon Lambda

Amazon Lambda 是一项无服务器事件驱动型计算服务,该服务使您可以运行几乎任何类型的应用程序或后端服务的代码,而无需预置或管理服务器。您可以从 200 多个亚马逊云科技服务和软件即服务(SaaS)应用程序中触发 Lambda,且只需按您的使用量付费。

场景介绍

本文将演示在常规的数仓调度任务监控中,如何实现基于 Airflow Task 颗粒度的监控(Task Owner,DAG ID,Task ID,执行时间),同时将告警信息通知到第三方消息工具(本文以飞书为示例)。

方案架构

方案包含两个部分实现(细节在下文中说明):

  • 竖线左方为 Amazon MWAA Task 监控,通过自定义回调函数将告警信息推送到 Amazon SNS
  • 竖线右方为现有的 Feish Notifier 一键部署方案,负责对消息进行自定义格式化以及通过 Webhook API 推送到第三方客户端

部署流程

飞书通知链路部署

  • 飞书提前创建好 Webhook API,并记录下来供后续使用(本文将忽略创建步骤)
  • Console 控制台搜索栏上搜索关键字“sar”

  • 点击左方“Available Application”,并搜索“feishu”关键字

  • 点击“Feish-Notifier”,将创建好的飞书 Webhook URL 输入后开始部署所需资源

  • 部署完成后从“Resources”中复制 Amazon SNS 完整 ARN 供后续使用

至此已完成飞书通知链路部署。

Amazon MWAA Task 监控配置

  • Task 监控配置。Airflow DAG 配置参数中,“on_failure_callback”参数的作用是 Task 执行失败后可以执行此参数指定的自定义回调函数,我们就可以通过此参数来进行基于任务颗粒度的监控以及非常灵活地进行自定义告警处理。
default_args = {
    "owner": "JerryWONG",
    "depends_on_past": False,
    #"retries": 1,
    #"retry_delay": timedelta(minutes=5),
    "on_failure_callback": on_failure_callback
}
  • 回调函数样例。此样例中核心处理逻辑是从回调上下文(context)里面将任务对象(TaskInstance)取出,并将对应的 owner、task_id,dag_id,execution_date 的值获取到,作为消息内容发送到上述步骤建立的 Amazon SNS 通道,从而推送到飞书客户端。
def on_failure_callback(context):
    # Initiate TaskInstance object
    ti = context.get('ti')

    # Get Task attributes
    owner = ti.task.owner
    execution_date = ti.execution_date
    task_id = ti.task_id
    dag_id = ti.task.dag_id

    op = SnsPublishOperator(
        task_id='dag_failure',
        target_arn=sns_topic_arn,
        subject="DAG FAILED",
        message_attributes={"MessageStructure": 'string'},
        message=f"MWAA(Airflow) task has failed, please check the task details:\n\n- Owner: {owner},\n- Execution_Date: {execution_date},\n- Task_ID: {task_id},\n- DAG_ID: {dag_id}"
    )
    op.execute(context)
  • 完整 DAG 代码样例
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
from airflow.utils.dates import days_ago
from airflow.models import TaskInstance
from airflow.sensors.external_task import ExternalTaskSensor
import os
from datetime import datetime as dt, timedelta


# Define AWS SNS Topic ARN
# sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
sns_topic_arn = 'arn:aws:sns:us-west-2:xxx:serverlessrepo-jerry-demo-Feishu-Notifier-ServerlessNotifierSNSTopic-XTDxij194UZp'


# Task failure callback function
# Call AWS SNS API to send notification
def on_failure_callback(context):
    # Initiate TaskInstance object
    ti = context.get('ti')

    # Get Task attributes
    owner = ti.task.owner
    execution_date = ti.execution_date
    task_id = ti.task_id
    dag_id = ti.task.dag_id

    op = SnsPublishOperator(
        task_id='dag_failure',
        target_arn=sns_topic_arn,
        subject="DAG FAILED",
        message_attributes={"MessageStructure": 'string'},
        message=f"MWAA(Airflow) task has failed, please check the task details:\n\n- Owner: {owner},\n- Execution_Date: {execution_date},\n- Task_ID: {task_id},\n- DAG_ID: {dag_id}"
    )
    op.execute(context)


# Default settings applied to all tasks
default_args = {
    "owner": "JerryWONG",
    "depends_on_past": False,
    #"retries": 1,
    #"retry_delay": timedelta(minutes=5),
    "on_failure_callback": on_failure_callback
}


DAG_ID = os.path.basename(__file__).replace(".py", "")
hour = dt.now().strftime("%Y%m%d%H")

with DAG(
        dag_id=DAG_ID,
        schedule_interval="0 * * * *",
        catchup=False,
        start_date=days_ago(1),
        default_args=default_args,
) as dag:
    submit_glue_job = GlueJobOperator(
        task_id="jerry-demo-redshift-msk-updated",
        job_name="jerry-demo-redshift-msk-updated",
        script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-redshift-msk.py",
        s3_bucket="aws-glue-assets-xxx-us-west-2",
        iam_role_name="glue_s3_full_access",
        create_job_kwargs={
            "GlueVersion": "4.0",
            "NumberOfWorkers": 2,
            "WorkerType": "G.1X",
            "Connections": {"Connections": ["jerry-demo-redshift-connection"]},
            "DefaultArguments": {
                "--enable-auto-scaling": "true",
                "--max-num-workers": "10",
                "--enable-metrics": "true",
                "--metrics-sample-rate": "1",
                "--job-bookmark-option": "job-bookmark-disable",
                "--enable-continuous-cloudwatch-log": "true",
                "--log-level": "INFO",
                "--enable-glue-datacatalog": "true",
                "--enable-spark-ui": "true",
                "--enable-job-insights": "true",
                "--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
                "--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"
            }
        }
    )

    # wait for external dag to finish
    wait_for_external_task = ExternalTaskSensor(
        task_id="wait_for_external_dag",
        external_dag_id="jerry-demo-mysql-redshift-dag",
        external_task_id=None,
        check_existence=True,
        # @与执行的external任务的时间差
        # execution_delta=timedelta(minutes=40),
        # allowed_states=["success"],
        # failed_states=["failed", "skipped"],
        mode="reschedule",
        timeout=60
    )

    wait_for_external_task >> submit_glue_job

最后作者特意运行一个配置错误导致 Task 运行失败的 DAG 来验证效果。任务失败时,马上能在飞书客户端收到自定义告警通知。

至此已完整演示如何实现 Amazon MWAA 基于任务责任人和任务颗粒度的告警通知方案。

本篇作者

黄家曦

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云计算方案咨询和设计。曾就职于思科、中国电信,在网络以及运营商骨干网有丰富经验,热衷于大数据领域,熟悉 EMR、Kinesis、Athena、Glue 等数据分析服务与方案设计。