亚马逊AWS官方博客

在 Amazon Web Services 构建无服务器消息推送网关

背景

传统基于HTTP/1.1 协议的 API 采用请求/响应模式,这种模式可以满足大多数的应用场景,但却不支持服务器与客户端之间的实时双向通信,比如服务器主动推送消息或者广播消息到客户端,HTML5 中推出的 WebSocket 规范可以很好满足实时双向通信的需求。

以往,您需要基于服务器搭建自己的 WebSocket 服务,并付出大量的运维精力来保证可用性及性能,尤其是当客户端请求量难以预测或客户端数量巨大的时候。

 

现在,您可以基于亚马逊云科技的托管服务,Amazon API Gateway (以下简称API Gateway) 或 Amazon AppSync (以下简称AppSync),构建自己的消息推送网关。本文会展示利用不同服务搭建消息推送网关应用的方法,并讨论其差异点及各自适用的场景。

预备知识

本文共涉及到两个消息推送网关方案的搭建,其架构均可概括为三层:用户接入层、存储层、控制层。其中,用户接入层主要用来建立并维持终端用户与消息网关之间的WebSocket连接(如API Gateway及AppSync);存储层主要用来存储广播消息的记录及相关业务数据;控制层用来实现主要的业务逻辑,如发起消息广播等。

相关服务简介

接下来,我们将介绍文中涉及到的主要服务,包括 API Gateway、Lambda、AppSync 及 DynamoDB。

API Gateway

API Gateway 是一种完全托管的服务,可以帮助开发人员轻松创建、发布、维护、监控和保护任意规模的 API。API Gateway 充当应用程序的入口,可从您的后端服务访问数据、业务逻辑或功能。使用 API Gateway,您可以创建 HTTP、 RESTful API 和 WebSocket 类型的网关,本文主要使用WebSocket类型网关。API Gateway 可以提供双向的 WebSocket 连接,支持使用 Lambda函数自定义逻辑,来处理连接建立、断开、消息数据等不同的事件类型。

Lambda

Lambda 是一种无服务器的计算服务,您无需预置或管理服务器就能运行代码。其几乎可以运行任何类型的应用程序或后端服务代码(如Node.js、Python、Go、Java 等)。Lambda 会自动、精确地分配计算执行能力,并根据传入的请求或事件运行您的代码,以适应任何规模的流量。

AppSync

AppSync 是完全托管的GraphQL服务,通过处理与DynamoDB、Lambda 等数据源之间复杂的安全连接任务来简化 GraphQL API 的开发。此外, AppSync 会自动伸缩以应对不同的 API 请求压力。本文主要讨论借助AppSync的订阅能力,实现消息推送的场景。AppSync 提供的WebSocket通道为单向的,也就是只支持从服务端向客户端的单向消息数据发送。

DynamoDB

本文选择DynamoDB作为数据存储层,DynamoDB是一种完全托管的非关系型K/V及文档数据库,可以在任何规模的环境中提供个位数的毫秒级性能。此外,DynamoDB还提供诸如全球表、CDC等非常实用的功能。

方案设计

接下来,我们会分别介绍基于 API Gateway 和 AppSync 搭建消息推送平台的方案。

基于API Gateway的方案

如下图所示,此方案借助 API Gateway、Lambda 及 DynamoDB 三个服务组合实现。

客户端连接流程

  1. 客户端发起 WebSokcet 连接请求到 API Gateway;
  2. API Gateway 触发 onConnect Lambda,将 API Gateway 提供的 Connection ID 写入 DynamoDB 数据库中;
  3. 客户端发送业务数据到 API Gateway,自动触发 sendMessage Lambda,实现诸如订阅 channel、设定自己昵称、发送消息、消息存档;
  4. 步骤3中的消息,可以通过 WebSocket 的双向通道,推送给其他终端;
  5. 当客户端断开 WebSocket 连接的时候,会自动触发 onDisconnection Lambda 函数,实现自定义逻辑,诸如删除 Connection ID,统计终端在线时长等。

消息广播逻辑

如果您需要将消息批量发送到多个终端,则可采用如下图所示的方式。

其主要步骤如下:

  1. 客户端通过 WebSocket 连接,发送消息广播命令;
  2. 自动触发 sendMessage Lambda,执行命令解析,获取需要发送广播的channel;
  3. sendMessage Lambda从 DynamoDB 读取 channel中的 connection id列表;
  4. 根据需要触发多个 Lambda,将 Connection ID 列表分配给不同的 broadcast Lambda;
  5. broadcast Lambda 调用 API Gateway 的@connection 命令,完成消息推送。

基于 AppSync 的方案

AppSync 是托管的 GraphQL 服务,并提供了通过 WebSocket 订阅数据变化的能力。 这里,我们设定 DynamoDB 作为数据存储端,在 DynamoDB 中存储的数据发生变化的时候,自动将数据变更推送到客户端。

AppSync 会为您提供两种类型的终端节点类型:实时终端节点及 GraphQL 终端节点。实时终端节点使用WebSocket协议,GraphQL 节点采用 HTTPS 协议,其URL格式如下:

  • 实时终端节点:wss://example1234567890000.appsync-realtime-api.us-east-1.amazonaws.com/graphql
  • GraphQL 终端节点:https://example1234567890000.appsync-api.us-east-1.amazonaws.com/graphql

需要注意的是,实时终端节点提供的是服务端到客户端的单向 WebSocket 通道。

原理如下图所示:

客户端订阅流程

  1. 客户端与 AppSync 与 实时终端节点建立 WebSocket 连接
  2. 客户端通过此 WebSocket 连接发送订阅请求

消息广播逻辑

  1. 消息广播端发送数据变更请求到AppSync的GraphQL终端节点,HTTPS请求
  2. 数据变更通过WebSocket连接传递到客户端

方案实现

接下来,我们介绍两种方案的实现方式。

基于API Gateway实现消息网关

以下代码在亚马逊云中国区域测试通过,您也可以选择在海外区域进行部署。

下载代码

git clone https://github.com/nwcd-samples/apigw-chat

安装依赖环境

pip3 install aws_xray_sdk git+https://github.com/Chen188/chalice.git@1.26.0-fix-cn-region

部署消息推送应用

  1. 安装并配置 awscli [1],
  2. 创建 S3 桶, aws s3 mb s3://your-bucket-name
  3. 修改 sh 中的 REGION( 北京区域为 cn-north-1 , 宁夏区域为 cn-northwest-1 ) 及 BUCKET.
  4. 执行 ./deploy.sh

功能验证

  1. 打开 CloudFormation控制台 [2],
  2. 选中 APIGWChat
  3. 切换到 输出 tab
  4. 找到 WebsocketConnectEndpointURL ,即为已经创建完成的 API GW wss 连接
  5. 安装 wscat, pip install wscat
  6. wscat -c <WebsocketConnectEndpointURL>
% wscat -c wss://cb55poxncd.execute-api.cn-northwest-1.amazonaws.com.cn/api
Connected (press CTRL+C to quit)
> mynickname
< Using nickname: mynickname
Type /help for list of commands.

 

测试效果

先以 User 身份建立两个连接,并加入 Room,如下图所示:

接下来,我们使用 Host 身份加入同样的 Room,并发布消息,此消息会被发送到处于同一个 Room 中的所有终端用户,

 

我们可以看到两个 User 都接收到此广播消息,

基于 AppSync 实现消息网关

接下来,我们将展示通过 AppSync 搭建消息广播应用的过程。

创建 AppSync 及数据库

打开 AppSync 服务的控制台 [3],点击 创建 API,

 

并选择 使用向导创建,点击开始,

 

为模型指定名称,这里我们使用 Room,另外您也可以自定义模型包含的字段及类型,点击 create(创建)。这一步中,会同时创建一个名为RoomTable的DynamoDB表。

 

之后,在确定API名称之后,就可以点击 创建 完成AppSync相关的创建工作。

至此,我们已经完成了AppSync及DynamoDB的创建,接下来,我们将通过控制台测试订阅及消息发布功能。

测试消息发布及消息订阅

首先,点击控制台左侧的查询按钮。根据下图所示步骤,订阅Room记录的创建事件。当有新的Room记录创建的时候,会自动接收到消息通知,通知里会包含指定的字段(本例为 id, title)。

 

然后,打开一个相同的浏览器页面,设定title为my-room-1,来创建一个新的记录:

 

同时,创建完成后,我们同样可以在订阅页面,看到同样的事件:

 

至此,我们已经验证完成了AppSync 做消息订阅的功能,接下来,我们需要通过实际的性能压测,来确保在生产系统可以有稳定的性能表现。

性能压测

我们从维持连接数量的能力、消息传播时延两个维度对性能进行压测。比较知名的压测工具为有很多,如 k6.io、JMeter、Thor 及Apache Bench 等,本文综合考虑性能、资源消耗情况及易用度,选择了k6.io,k6.io 是一款优秀的开源性能压测工具,支持通过脚本控制压测逻辑,也可以设置自定义的业务指标,此外,其对 WebSocket 支持的比较好。

测试连接数量

API Gateway方案

为了验证 API Gateway 方案可以承载的连接数量,我们设定如下测试逻辑:

  • 使用 k6 设定在x秒内建立y个连接数量,
  • 在连接建立的时候,会触发 Lambda 将 connection id 写入到 DynamoDB 中,
  • 在连接建立之后,我们发送 nickname,存入DynamoDB 数据库,
  • 之后发送 /join Room1 命令,将当前的用户的房间信息存入 DynamoDB 数据库。

 

测试代码如下,完整代码您可以在 GitHub 上找到。测试过程中,我们使用的实例类型为 R5.4xlarge,其包含 16 个 vCPU,128GB 内存。另外, 您还需要对 OS 参数进行调优,具体可以参考 [4]。

let ramp_up_duration   = 60,  // 在x秒内建立 *target* 个ws连接
    keepalive_duration = 8,   // 保持x秒ws连接不断开
    tear_down_duration = 5,   // tear down
    target             = 10000 // 建立连接数
    ;
export let options = {
  scenarios: {
    'apigw-listener': {  // 建立ws连接
      executor: 'ramping-vus',
      exec: 'listener',
      startVUs: 0,
      stages: [
        { duration: ramp_up_duration   + 's', target: target },
        { duration: keepalive_duration + 's', target: target },
        { duration: tear_down_duration + 's', target: 0 },
      ],
      gracefulRampDown: '10s',
      gracefulStop: '10s',
    },
  }
};

 …

socket.on('open', () => {
    socket.send(gen_nickname())
});

socket.on('message', (msg) => {
if(msg.startsWith('Using nick')) {
    nicknameCounter.add(1);
    join_room();
} else if(msg.startsWith('Joined chat')) {
    if(role != 'istner') {
    broadcastTime = +new Date();
    console.log('now broadcast ' + broadcastTime)
    socket.send(broadcastTime)
    }

    joinRoomSuccessCounter.add(1);
} else if(msg.startsWith('Broadcast')) {
    msgRecvCounter.add(1);

    var received_time = +msg.split(' ')[1]

    var delay = +new Date() – received_time;
    ws_resp_delay.add(delay);
} else {
    console.error('unknown msg: ' msg)
} 

 

我们先以60s建立1W个连接为例,看下k6的输出日志:

data_received............: 57 MB  754 kB/s
data_sent................: 8.7 MB 115 kB/s
joined_success_counter...: 10000  131.558712/s
nickname_counter.........: 10000  131.558712/s
vus......................: 9      min=9        max=10000
vus_max..................: 10000  min=10000    max=10000
ws_connecting............: avg=38.58ms min=30.15ms med=38.06ms max=176.98ms p(90)=42.2ms p(95)=44.85ms
ws_msgs_received.........: 20000  263.117424/s
ws_msgs_sent.............: 20000  263.117424/s
ws_sessions..............: 10000  131.558712/s

 

其中, ws_session 是 k6 内置的针对 WebSocket 的指标,表示总共建立的连接数量;nickname_counter 是我们自己定义的一个 Counter 计数指标,表示 nickname 设置成功的次数,joined_success_counter 是我们自己定义的一个 Counter 计数指标,表示 join 命令执行成功的个数。

从 k6 指标上看,我们成功在1分钟内建立了1W 个连接。同时,我们也可以通过 CloudWatch 的监控指标,查看链接建立的情况,如下图所示,

 

接下来测试 15 分钟建立 15W 连接的情况,由于单机的限制,我们将使用三台同规格的实例(R5.4xlarge),每台机器承载总共 5W 的连接任务,k6 的执行命令为:

k6 run --execution-segment "0:1/3"   --execution-segment-sequence "0,1/3,2/3,1" k6_ws_apigw.js
k6 run --execution-segment "1/3:2/3" --execution-segment-sequence "0,1/3,2/3,1" k6_ws_apigw.js
k6 run --execution-segment "2/3:1"   --execution-segment-sequence "0,1/3,2/3,1" k6_ws_apigw.js 

 

三台实例均成功建立了5W连接,k6的部分输出如下:

data_received............: 289 MB 235 kB/s
data_sent................: 46 MB  38 kB/s
joined_success_counter...: 50000  40.54847/s
nickname_counter.........: 50000  40.54847/s
…
ws_msgs_received.........: 100000 81.09694/s
ws_msgs_sent.............: 100000 81.09694/s
ws_sessions..............: 50000  40.54847/s

 

由以上测试结果可以看出,在15 分钟内建立 15W 连接是完全没有问题的。如果您感兴趣,也可以测试更高级别的连接数。实际上,API Gateway 并没有针对连接总数进行限制,但是您的每秒新增连接数及每秒的并发请求数则受到亚马逊云账户的限制,具体可以查看官方文档[5]。

AppSync方案

AppSync 和 API Gateway 的连接方式稍有不同,需要采用 AppSync 规定的通信流程,以下列举几个典型的差异,具体可以参考文档 [6]:

  • 请求 header 需要包含 Sec-WebSocket-Protocol,值为 graphql-ws
  • 请求 header 中需要包含认证信息 x-api-key
  • 定期同步 keep alive 信息

 

接下来,我们对创建好的AppSync进行压测。在15分钟内,建立15W连接。同样的,我们需要借助三台机器来实现共计15W的连接数。测试的配置如下:

let ramp_up_duration   = 900,  // 在x秒内建立 *target* 个ws连接
    keepalive_duration = 60,   // 保持x秒ws连接不断开
    tear_down_duration = 300,   // tear down
    target             = 150000 // 建立连接数
    ; 

 

三台机器的指标数据接近,其中ws_seesions 及 ws_msgs_sent完全一致,以其中一台的结果为例:

ResponseSuccessRate...: 100.00% ✓ 677500     ✗ 0
data_received.........: 326 MB  258 kB/s
data_sent.............: 88 MB   70 kB/s
vus_max...............: 50000   min=50000    max=50000
ws_connecting.........: avg=16.68ms min=4.87ms med=7.73ms max=15.46s p(90)=8.52ms p(95)=8.81ms
ws_msgs_received......: 677501  536.388183/s
ws_msgs_sent..........: 100000  79.171571/s
ws_sessions...........: 50000   39.585785/s

 

同时,我们也可以通过 CloudWatch 的监控指标,查看链接建立的情况,如下图所示,

 

可以看出,在15分钟内建立15W个连接,没有出现任何问题。如果您感兴趣,也可以使用此代码,自行测试。

 

在测试完两个方案可以承载的连接数后,我们发现,其均可以在15分钟内建立15W的连接,在很多应用场景下,性能都是足够的。

测试消息广播时延

接下来,我们还需要测试其消息广播性能,也就是将消息广播给多个终端时的延迟情况。我们通过在k6的脚本中,增加消息广播的逻辑,也就是说在连接都建立完成之后,通过控制命令发送广播消息。对于API Gateway来时,消息内容可以是任何自定义的数据;对于AppSync来说,是一条数据库变更记录。

 

API Gateway方案

在k6中,增加的广播逻辑为:

let options = {
  scenarios: {
    'apigw-listener': …,
    'apigw-broadcast': {
      executor: 'per-vu-iterations',
      exec: 'broadcast',
      vus: 1,
      iterations: 1,
      startTime: (ramp_up_duration + Math.floor(keepalive_duration * Math.random())) + 's',
      maxDuration: '5s',
      gracefulRampDown: '3s',
    }
  }
}; 

 

其中,startTime 指定消息广播的开始事件为:预期数量的连接都建立之后,在连接保持期间,随机选一个时间点,执行消息广播函数。

消息广播的流程是:建立 WebSocket 连接,设定 nickname,加入Room1,发送消息。当其他终端收到消息的时候,计算当前时间与消息发送时间差,即为消息广播延迟。

 

我们先以一个终端开始,测试消息延迟情况:

ws_resp_delay(ms)........: avg=51      min=47      med=51      max=55      p(90)=54.2    p(95)=54.6

 

我们可以看出,平均的消息延迟为 51ms。

接下来,测试广播消息给1000个终端的情况:

ws_resp_delay(ms)........: avg=5420.94023 min=84      med=5405.5  max=10713 p(90)=9668 p(95)=10178.95

可以看出,消息延迟与终端数量正相关。我们可以采取一些优化动作,来提高消息广播的效率,但是,由于其消息广播的实现方式,广播时延存在不可逾越的限制。因此,API Gateway的方案,更适合1:1的消息通知。

 

AppSync方案

1)我们先以1个终端开始,测试消息延迟情况:

ws_resp_delay(ms)..............: avg=840      min=840      med=840      max=840      p(90)=840      p(95)=840

 

可以看到,即使只有一个终端的情况下,延迟也达到了840ms,这是因为,我们在计算的时候,消息广播命令需要通过HTTP请求发出去,且我们的脚本把HTTPS连接建立的时间也计算了进去。

 

2)我们可以使用HTTP keepalive的功能,减少连接建立带来的影响。这里,我们借助k6的连接复用功能,测试广播100条消息给1个终端的延迟情况:

ws_resp_delay(ms)..............: avg=254.19   min=129      med=211      max=1274     p(90)=412.5    p(95)=456.4

可以看到,平均延迟显著降低。

 

3)接下来,看下1000个终端,广播100条消息的情况,

ws_resp_delay(ms)..............: avg=226.73916 min=141      med=205      max=894      p(90)=307      p(95)=388

 

4)继续增加到1W个终端,

ws_resp_delay(ms)..............: avg=273.764721 min=144      med=239      max=1183     p(90)=409      p(95)=459

 

5)继续增加到2W个终端,这时,单机无法有效承载这么大的计算量,因此,我们采用了两台机器各自建立1W连接,

ws_resp_delay(ms)..............: avg=265.967421 min=135      med=233      max=1136     p(90)=404      p(95)=466

 

可以看出,1个终端或者2W个终端,其延迟数据基本接近。如果您感兴趣的话,可以使用更多机器来测试更多连接下的延迟情况。

小结

至此,我们从方案设计、到方案实现以及性能压测三个部分,对分别基于API Gateway及AppSync服务搭建的两个消息网关方案进行了介绍。我们建议如下,如果您需要将消息同时广播给数万终端的话,您可以借助AppSync来轻松实现;如果您需要一个双工的WebSocket通道,希望灵活掌控链接建立及断开事件,且1:1消息传递占多数,我们建议您使用API Gateway的方案。无论您选择哪个方案,您都几乎不需要运维相关底层基础设施,从而让您更快实现业务创新。

 

参考文档

[1] 安装AWS CLI, https://docs.thinkwithwp.com/cli/latest/userguide/install-cliv2.html

[2] 中国区CloudFormation控制台,https://console.amazonaws.cn/cloudformation/home

[3] 中国区AppSync控制台,https://console.amazonaws.cn/appsync/home

[4] 系统参数调优,https://k6.io/docs/testing-guides/running-large-tests/

[5] API Gateway账户限制,https://docs.thinkwithwp.com/apigateway/latest/developerguide/limits.html

[6] AppSync的通信流程,https://docs.thinkwithwp.com/appsync/latest/devguide/real-time-websocket-client.html

 

本篇作者

陈斌

亚马逊云科技解决方案架构师,负责基于亚马逊云科技云计算方案的架构咨询与设计,具有丰富的解决客户实际问题的经验,目前关注深度学习的研究与应用。