亚马逊AWS官方博客

一组用于 Amazon SQS 死信队列重新传输的新 API

今天,我们推出了一组用于 Amazon Simple Queue Service(Amazon SQS)的新 API。这些新 API 允许您以编程方式管理死信队列(DLQ)重新传输。现在,您可以使用 AWS SDKAWS 命令行界面(AWS CLI)以编程方式将消息从 DLQ 移至其原始队列或自定义队列目标,尝试再次处理。DLQ 是一个队列,Amazon SQS 会自动将您的使用者应用程序未正确处理的消息移至该队列。

为了充分理解这个新 API 如何为您提供帮助,我们快速回顾一下历史。

消息队列是现代应用程序架构不可或缺的一部分。这些队列允许消息生产者和使用者之间进行异步和基于消息的通信,从而使开发人员能够分离服务。在大多数系统中,消息会一直保留在共享存储(队列)中,直到使用者对其进行处理。消息队列允许开发人员构建能够抵御临时服务故障的应用程序。有助于确定消息处理的优先顺序,并扩展处理消息的 Worker 节点的实例集。消息队列在事件驱动型架构中也很常见。

异步消息交换在应用程序架构中并不是什么新鲜事物。在应用程序之间异步交换消息的概念出现在 20 世纪 60 年代,并在 IBM 于 1972 年推出 OS/360 版 TCAM 时首次进入大众视野。普遍采用出现在 20 年后,IBM 于 1993 年推出 MQ 系列(现为 IBM MQ),Sun Microsystems 于 1998 年发布 Java Messaging Service(JMS),这是 Java 应用程序与消息队列交互的标准 API。

AWS 于 2006 年 7 月 12 日推出了 Amazon SQS。Amazon SQS 是一项高度可扩展、可靠且具有弹性的队列服务,可以“恰好满足需求”。 正如 Werner 当时所写的那样:“我们选择了一种并发模型,在这种模型中,处理消息的进程会自动获得该消息的租用锁;如果在租约到期之前没有删除消息,则可以再次对其进行处理。因此,故障处理变得非常简单。

2014 年 1 月 29 日,我们推出了死信队列(DLQ)。DLQ 可帮助您避免无法处理的消息永远停留在队列顶部,这可能会阻止队列中的其他消息得到处理。在 DLQ 中,每个队列都有一个关联的属性,告知 Amazon SQS 一条消息可能被提交多少次以供处理(maxReceiveCount)。每条消息还有一个关联的接收计数器(ReceiveCount)。每次使用者应用程序接收消息进行处理时,消息接收计数都会增加 1。当 ReceiveCount > maxReceiveCount 时,Amazon SQS 会将消息移至您指定的 DLQ 以供人工分析和调试。您通常将警报与 DLQ 相关联,以便在发生此类事件时发送通知。将消息移至 DLQ 的典型原因是其格式不正确、使用者应用程序中存在错误或者处理消息需要很长时间。

在 AWS re:Invent 2021 上,AWS 宣布在 Amazon SQS 控制台上推出死信队列重新传输功能。重新传输解决了失败消息生命周期第二部分的问题。该功能允许您将消息重新注入其原始队列中以尝试再次对其进行处理。修复使用者应用程序并准备好使用失败的消息后,您可以将消息从 DLQ 重新传输回源队列或自定义队列目标。只需要在控制台上点击几下即可。

今天,我们添加了 API,允许您编写以编程方式处理重新传输的应用程序和脚本。不再需要人工点击控制台。使用该 API 可以提高进程的可扩展性并降低人为错误的风险。

下面来看实际操作
为了试用这个新 API,我打开了一个仅用于命令行演示的终端。开始之前,我确保我有最新版本的 AWS CLI。在 macOS 上,我输入 brew upgrade awscli

我首先创建两个队列。一个是死信队列,另一个是我的应用程序队列:

# 首先,我创建死信队列(注意我选择在队列名称末尾添加的 -dlq)
➜ ~ aws sqs create-queue \
            --queue-name awsnewsblog-dlq                                            
{
    "QueueUrl": "https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog-dlq"
}

# 其次,我检索我刚刚创建的队列的 ARN
➜  ~ aws sqs get-queue-attributes \
             --queue-url https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog-dlq \
             --attribute-names QueueArn
{
    "Attributes": {
        "QueueArn": "arn:aws:sqs:us-east-2:012345678900:awsnewsblog-dlq"
    }
}

# 第三,我创建应用程序队列。我输入重新传输策略:三次传输尝试后在 DLQ 中发布消息
➜  ~ aws sqs create-queue \
             --queue-name awsnewsblog \
             --attributes '{"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-2:012345678900:awsnewsblog-dlq\",\"maxReceiveCount\":\"3\"}"}' 
{
    "QueueUrl": "https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog"
}

现在两个队列都准备就绪,我向应用程序队列发布一条消息:

➜ ~ aws sqs send-message \
            --queue-url https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog \
            --message-body "Hello World"
{
"MD5OfMessageBody": "b10a8db164e0754105b7a99be72e3fe5",
"MessageId": "fdc26778-ce9a-4782-9e33-ae73877cfcb2"
}

接下来,我使用该消息,但我没有将其从队列中删除。这模拟了消息使用者应用程序中的崩溃。消息使用者应该在成功处理后删除消息。当我输入 redrivePolicy 时,我将 maxReceivedCount 属性设置为 3。因此,我重复此操作三次,强制 Amazon SQS 在三次传输尝试后将消息移至死信队列。默认的可见性超时为 30 秒,因此我必须在两次重试之间等待 30 秒或更长时间。

➜ ~ aws sqs receive-message \
            --queue-url https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog
{
"Messages": [
{
"MessageId": "fdc26778-ce9a-4782-9e33-ae73877cfcb2",
"ReceiptHandle": "AQEBP8yOfgBlnjlkGXjyeLROiY7xg7cZ6Znq8Aoa0d3Ar4uvTLPrHZptNotNfKRK25xm+IU8ebD3kDwZ9lja6JYs/t1kBlwiNO6TBACN5srAb/WggQiAAkYl045Tx3CvsOypbJA3y8U+MyEOQRwIz6G85i7MnR8RgKTlhOzOZOVACXC4W8J9GADaQquFaS1wVeM9VDsOxds1hDZLL0j33PIAkIrG016LOQ4sAntH0DOlEKIWZjvZIQGdlRJS65PJu+I/Ka1UPHGiFt9f8m3SR+Y34/ttRWpQANlXQi5ByA47N8UfcpFXXB5L30cUmoDtKucPewsJNG2zRCteR0bQczMMAmOPujsKq70UGOT8X2gEv2LfhlY7+5n8z3yew8sdBjWhVSegrgj6Yzwoc4kXiMddMg==",
"MD5OfBody": "b10a8db164e0754105b7a99be72e3fe5",
"Body": "Hello World"
}
]
}

# 等待 30 秒,
# 然后重复两次(总共三次接收消息 API 调用)

经过三次处理尝试后,该消息已不在队列中:

➜  ~ aws sqs receive-message \
             --queue-url  https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog
{
    "Messages": []
}

该消息已移至死信队列。我查看 DLQ 进行确认(注意队列 URL 以 -dlq 结尾):

➜  ~ aws sqs receive-message \
             --queue-url  https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog-dlq
{
    "Messages": [
        {
            "MessageId": "fdc26778-ce9a-4782-9e33-ae73877cfcb2",
            "ReceiptHandle": "AQEBCLtBMoZYVMMq7fUGNHeCliqE3mFXnkuJ+nOXLK1++uoXWBG31nDejCpxElmiBZWfbcfGJrEdKj4P9HJdrQMYDbeSqB+u1ZlB7CYzQBiQps4SEG0biEoubwqjQbmDZlPrmkFsnYgLD98D1XYWk/Ik6Z2n/wxDo9ko9rbZ15izK5RFnbwveNy8dfc6ireqVB1EGbeGkHcweHGuoeKWXEab1ynZWhNqZsQgCR6pWRkgtn59lJcLv4cJ4UMewNzvt7tMHH69GvVjXdYDYvJJI2vj+6RHvcvSHWWhTNT+CuPEXguVNuNrSya8gho1fCnKpVwQre6HhMlLPjY4wvn/tXY7+5rmte9eXagCqLQXaENB2R7qWNVPiWRIJy8/cTf37NLYVzBom030DNJlH9EeceRhCQ==",
            "MD5OfBody": "b10a8db164e0754105b7a99be72e3fe5",
            "Body": "Hello World"
        }
    ]
}

现在设置已经准备就绪,我们以编程方式将消息重新传输到其原始队列。假设我明白为什么使用者没有正确处理消息,并且我修复了使用者应用程序代码。我在 DLQ 上使用 start-message-move-task 来启动异步重新传输。有一个可选属性(MaxNumberOfMessagesPerSecond)可以控制重新传输的速度:

➜ ~ aws sqs start-message-move-task \
            --source-arn arn:aws:sqs:us-east-2:012345678900:awsnewsblog-dlq
{
    "TaskHandle": "eyJ0YXNrSWQiOiI4ZGJmNjBiMy00MmUwLTQzYTYtYjg4Zi1iMTZjYWRjY2FkNmEiLCJzb3VyY2VBcm4iOiJhcm46YXdzOnNxczp1cy1lYXN0LTI6NDg2NjUyMDY2NjkzOmF3c25ld3NibG9nLWRscSJ9"
}

我可以列出并检查我通过 list-message-move-tasks 启动的移动任务的状态,也可以通过调用 cancel-message-move-task API 取消正在运行的任务:

➜ ~ aws sqs list-message-move-tasks \
            --source-arn arn:aws:sqs:us-east-2:012345678900:awsnewsblog-dlq
{
    "Results": [
        {
            "Status": "COMPLETED",
            "SourceArn": "arn:aws:sqs:us-east-2:012345678900:awsnewsblog-dlq",
            "ApproximateNumberOfMessagesMoved": 1,
            "ApproximateNumberOfMessagesToMove": 1,
            "StartedTimestamp": 1684135792239
        }
    ]
}

现在我的应用程序可以再次使用应用程序队列中的消息:

➜  ~ aws sqs receive-message \
             --queue-url  https://sqs.us-east-2.amazonaws.com/012345678900/awsnewsblog                                   
{
    "Messages": [
        {
            "MessageId": "a7ae83ca-cde4-48bf-b822-3d4bc1f4dcae",
            "ReceiptHandle": "AQEB9a+Dm2nvb3VUn9+46j9UsDidU/W6qFwJtXtNWTyfoSDOKT7h73e6ctT9RVZysEw3qqzJOx1cxblTTOSrYwwwoBA2qoJMGsqsrsRGGYojBvf9X8hqi8B8MHn9rTm8diJ2wT2b7WC+TDrx3zIvUeiSEkP+EhqyYOvOs7Q9aETR+Uz02kQxZ/cUJWsN4MMSXBejwW+c5ivv5uQtpfUrfZuCWa9B9O67Kj/q52clriPHpcqCCfJwFBSZkGTXYwTpnjxD4QM7DPS+xVeVfTyM7DsKCAOtpvFBmX5m4UNKT6TROgCnGxTRglUSMWQp8ufVxXiaUyM1dwqxYekM9uX/RCb01gEyCZHas4jeNRV5nUJlhBkkqPlw3i6w9Uuc2y9nH0Df8nH3g7KTXo4lv5Bl3ayh9w==",
            "MD5OfBody": "b10a8db164e0754105b7a99be72e3fe5",
            "Body": "Hello World"
        }
    ]
}

可用性
DLQ 重新传输 API 现已在所有提供 Amazon SQS 的商业区域推出。

将消息从死信队列重新传输到源队列或自定义目标队列会生成额外的 API 调用,根据现有定价计费(在前一百万次 API 调用之后,起价为每百万次 API 调用 0.40 美元,每个月的前一百万次调用免费)。Amazon SQS 对消息进行批处理,同时将其从一个队列重新传输到另一个队列。这样,将消息从一个队列移至另一个队列就成为一种简单且低成本的选择。

要了解有关 DLQ 和 DLQ 重新传输的更多信息,请查看我们的文档

请记住,我们生活在一个异步世界中,您的应用程序也应如此。立即开始编写您的第一个重新传输应用程序

— seb