亚马逊AWS官方博客

基于 Lambda 实现 Claude3 的流式响应

在如今的大语言模型推理输出场景中,流式响应基本已成为必备的功能之一。一方面符合大语言模型生成方式的本质,另一方面当模型推理效率不是很高时,流式响应比起全部 generate 后再输出、能大幅缩短从开始请求到输出第一个 Token 的时间,极大地提高用户体验。

从端到端的视角,可以将大语言模型的流式响应分为两个部分:模型本身的流式推理、后端程序将模型的流式响应输出到客户端,如下图所示。本文将以 Claude3 为例,阐述如何端到端的实现大语言模型的流式响应。

一、流式推理

当今的主流大语言模型,大都已支撑流式推理。我们以 Python 代码来实现 Claude3 的流式推理为例,给出如下示例。Claude3 采用了 Message API,支持多模态的数据输入。推理返回结果的内容结构,相比于此前 Claude2 的 Completions API 也发生了一些变化。

import json
import sys
import boto3

client = boto3.client("bedrock-runtime", region_name="us-east-1")

# 使用文本提示调用Claude 3
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
prompt = "你好"

body = json.dumps({
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 1024,
    "messages": [
        {
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }
    ],
})

response = client.invoke_model_with_response_stream(
    body=body,
    modelId=model_id,
    accept="application/json",
    contentType="application/json",
)

stream = response["body"]
if stream:
    for event in stream:
        chunk = event.get("chunk")
        if chunk:
            chunk_obj = json.loads(chunk.get("bytes").decode())
            if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                if chunk_obj['type'] == 'content_block_delta':
                    chunk_obj = chunk_obj['delta']
                    sys.stdout.write(chunk_obj["text"])
                    sys.stdout.flush()  # 正式使用时,这里可以用yield返回

输出效果如下所示:

二、流式响应的技术方案介绍

目前流式响应输出到客户端的技术,主要有长轮询、 WebSocket 与 SSE 。

长轮询

浏览器发出 XMLHttpRequest 请求,服务器端接收到请求后,会阻塞请求直到有数据或者超时才返回,浏览器 JS 在处理请求返回信息(超时或有效数据)后再次发出请求,重新建立连接。 这种方式用于流式响应,一方面增加了不必要的请求-响应的往返时间,还给客户端与服务端带来了额外的负载压力与资源浪费。

WebSocket

WebSocket 是 HTML5 定义的新协议,实现了服务器与客户端之间的全双工通信。WebSocket 连接一旦建立,客户端和服务器端处于平等地位,可以相互发送数据,不存在请求和响应的区别,其原理如下图所示。

WebSocket 通常应用于实时聊天、多人在线游戏等场景。在 AWS 上可以使用 API Gateway 与 Lambda 来实现 Websocket 的服务端。但在 Claude3 的流式响应场景中,采用 Websocket 方案显得有些大材小用,我们希望能有更轻量级的实现方式。

HTTP SSE

HTTP SSE 的全称是 HTTP Server-Sent Events,它提供了一种从服务器实时发送更新事件到客户端的技术。SSE 主要解决了客户端与服务器之间的单向实时通信需求(例如 Claude3 回答的流式响应),相较于 WebSocket(双向实时),它更加轻量级且易于实现。SSE 是基于 HTTP 协议实现的,所以更适用于服务器持续的向客户端发送文本。其原理示意如下图所示。

本文将主要使用 HTTP-SSE 技术来展开介绍,如何将 Claude3 的回答流式推向客户端。

三、使用 Python Flask 搭建 SSE Demo

一提到 SSE,我们首先考虑了使用 Python Flask 框架来实现一个原型(Demo)。Flask 是一个轻量级的 Web 应用框架,它提供了简单易用的工具和技术来构建 Web 服务器。特别是,我们利用了 Flask 的 stream_with_context 功能来实现服务器发送事件(SSE),这使得服务器能够以流的形式实时推送数据到客户端。

将 Claude3 的流式推理结果,通过 SSE 技术推送到客户端的代码如下:

#pip install Flask boto3 CORS
from flask import Flask, Response, stream_with_context, request
from flask_cors import CORS  # 导入CORS模块
import time
import json
import sys
import boto3


app = Flask(__name__)
CORS(app)  # 为app添加跨域支持

def generate_stream(prompt):
    client = boto3.client("bedrock-runtime", region_name="us-east-1")

    # 调用 Claude 3 并提供文本提示
    model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "messages": [{
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }],
    })

    response = client.invoke_model_with_response_stream(
        body=body,
        modelId=model_id,
        accept="application/json",
        contentType="application/json",
    )

    stream = response["body"]
    if stream:
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                chunk_obj = json.loads(chunk.get("bytes").decode())
                if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                    if chunk_obj['type'] == 'content_block_delta':
                        chunk_obj = chunk_obj['delta']
                        yield f"data: {json.dumps(chunk_obj['text'], ensure_ascii=False)}\n\n"  # 修改为适合HTTP SSE的格式


@app.route('/stream')
def stream():
    prompt = request.args.get('prompt', '你好')  # 从HTTP请求中提取prompt变量的值,默认值为"你好"
    response = Response(stream_with_context(generate_stream(prompt)), content_type='text/event-stream')
    response.headers['Access-Control-Allow-Origin'] = '*'  # 允许所有域名跨域访问
    return response

if __name__ == '__main__':
    app.run(debug=True, port=9000, host='0.0.0.0')

web 端代码如下:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下所示:

尽管使用 Flask API 来实现这个 Demo 相对简单直接,但在生产环境中,直接使用 Flask API 作为 API 服务器存在一些劣势,特别是当考虑到可扩展性、管理和成本效率时。例如,Flask 应用通常需要部署在一台或多台服务器上,这意味着你需要管理这些服务器的维护、监控和扩展。此外,对于流量波动较大的应用,服务器可能会在低峰时期闲置,造成资源浪费,或在高峰时期过载,影响服务质量。

为了克服这些劣势,我们转向了 AWS Lambda 来实现 streaming response。

四、基于 AWS  Lambda  来实现  SSE Demo

AWS Lambda 是一个无服务器计算服务,它允许你运行代码而无需预置或管理服务器。Lambda 只在代码执行时才收费,这使得它在成本效率上对于不定时或间歇性的工作负载非常有吸引力。通过 Lambda,我们可以实现一个更加弹性的架构,自动扩展以应对请求量的变化,同时减少了管理服务器的负担。

在客户端你可以基于 SSE 协议,调用 Lambda 函数 URL 来实时获取响应结果。

创建 Lambda 函数

AWS Lambda 默认支持使用 Node.js Runtime 支持流式响应。对于其他语言,你可以使用使用带有自定义 Runtime 方式来实现,或者使用 Lambda Web 适配器。

然后将如下代码粘贴到 Lambda 函数中:

import util from 'util';
import stream from 'stream';
import {
  BedrockRuntimeClient,
  InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";

import querystring from 'querystring';

const finished = util.promisify(stream.finished); 

// Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });
const modelId = "anthropic.claude-3-sonnet-20240229-v1:0";
let prompt = "您好";


export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  
  console.log(event);

  responseStream.setContentType("text/event-stream");
  
   // 假设event.rawQueryString存在并包含查询字符串
  const rawQueryString = event.rawQueryString;
    
  // 使用querystring模块解析查询字符串
  const params = querystring.parse(rawQueryString);
  
  prompt=params['prompt'];
    
  
  // Prepare the payload for the model.
  const payload = {
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: 1000,
    messages: [
      {
        role: "user",
        content: [{ type: "text", text: prompt }],
      },
    ],
  };
  try {
    // 使用payload调用Claude并等待API响应
    const command = new InvokeModelWithResponseStreamCommand({
      contentType: "application/json",
      body: JSON.stringify(payload),
      modelId,
    });
    const apiResponse = await client.send(command);

    // 解码并处理响应流
    for await (const item of apiResponse.body) {
      const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
      const chunk_type = chunk.type;

      if (chunk_type === "content_block_delta") {
        const text = chunk.delta.text;
        responseStream.write(`data: ${JSON.stringify(text)}\n\n`);
      }
    }
  } catch (error) {
    console.error("处理API响应时发生错误:", error);
    responseStream.write(`data: ${JSON.stringify({ error: "处理请求时发生错误" })}\n\n`);
  }
  
  responseStream.end();
  
  
});

配置权限

在创建完 Lambda 后,会生成一个具备基础权限的默认角色。需要在该角色中增加 Bedrock 的调用权限。点击下图中的角色名称,打开 IAM 服务中的角色管理页面。

在该 IAM 角色中添加一个内联权限,权限内容如下所示:

配置 Lambda 响应超时时间

在流式响应场景中,Lambda 的整体响应时间范围可能从几秒中到几分钟,具体取决于输入/输出 Token 的大小。所以需要提前修改 Lambda 默认的超时时间。

设置 Lambda 函数 URL

在函数 URL 配置页面,建议 Auth Type 选择 NONE。因为如果选择 AWS_IAM 方式,则需要在客户端存储 AWS 身份认证信息(如 AK/SK),有较大的安全隐患。所以,在面向最终用户的场景中,建议 Auth Type 设置为 NONE,然后在 Lambda 的 Header 中自行实现一些认证方式(如 API_KEY)。

此前,Invoke mode 需要选择 RESPONSE_STREAM,并配置 CORS,允许跨域访问。

Web 端示例代码:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应(Lambda)</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下:

Lambda 流式响应的限制

  • 单次调用的流式响应有默认为 20MB 的大小限制,但可以向后台申请提升限制。
  • 单次调用的流式响应的前 6MB 拥有不受限制的带宽。之后将以最大 2MBps 的速率进行流式响应。这在大语言推理场景,应该是完全足够了。
  • 通过 API Gateway 的 LAMBDA_PROXY 集成不支持流式响应。你可以在 API Gateway 和 Lambda 函数 URL 之间使用 HTTP_PROXY 集成,但你将受到 API Gateway 的 10MB 响应负载限制。此外,API Gateway 不支持分块传输编码,因此将无法实现流式响应的预期效果。

五、总结&综述

本文从端到端的视角,介绍了 Claude3 的流式推理,以及服务端流式响应的技术选型。通过比较分析,建议基于 Http-SSE 这种轻量级方式,来实现流式响应。最后,以 Claude3 为例,基于 Http-SSE 技术,分别介绍了使用 Python Flask、 AWS 云原生服务 Lambda 实现流式响应的实践。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

本篇作者

张盼富

AWS 解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。