亚马逊AWS官方博客

利用 Amazon Bedrock 和 Amazon EKS 构建多租户的基于 RAG 的聊天机器人

导言

随着生成式人工智能模型的出现,许多客户都在探索如何构建聊天机器人应用,以满足其终端客户的各种需求,这包括了既要让每个聊天机器人都能专门处理特定租户的上下文信息,又要能利用其开发团队熟悉的经济高效的基础设施大规模运行此类多租户应用程序。Amazon Bedrock 是一项全托管的云服务,它通过应用编程接口(API)提供了一系列功能强大的基础模型服务,用户无需管理基础设施,简化了高适应性和高成本效益的生成式人工智能聊天机器人的开发过程,促使这一条挑战迎刃而解。Amazon EKS 解决了构建这类聊天机器人应用的几个难题,如多租户,自动扩缩容,弹性和成本。大多数 AWS 客户都会选择 Amazon EKS 来做大规模部署基于容器的多租户应用,以满足内部和外部用户的需求。根据各种调查和报告,包括来自 CNCF,Splunk 和 Datadog 等,都表明了Kubernetes 作为在各类企业中首选的容器编排解决方案的日益增长的强劲势头。

本文深入探讨了使用检索增强生成(RAG)构建多租户聊天机器人的解决方案。RAG 是一种常见的模式,在这种模式下,通用的大语言模型会根据用户问题和从专有数据中提取的信息作为附加的上下文进行查询。该方案的分步部署说明和示例代码可以从这一个 GithHub 仓库中获取。

解决方案概述

如下图所示,该解决方案使用 Amazon EKS 作为运行容器化聊天机器人应用的基础,Kubernetes 命名空间作为每个租户计算工作量的逻辑单位。Istio 是一种开源的服务网格,通常与 Kubernetes 一起用于部署多租户应用,并提供 Pod 级别的流量管理、安全管控和可观察性等功能。我们使用 Istio 服务网格来实现入口控制和路由功能,因为它支持外部授权、JWT 验证以及配置基于不同 HTTP 参数的请求路由。我们使用开放身份连接(Open ID Connect,OIDC)代理将授权请求转发给身份认证服务(这里采用的是 Amazon Cognito User Pool)。客户端的请求通过 NLB 接收。对于每个租户,我们使用了 DNS 子域名模式来转发用户的请求,其中 DNS 子域名映射到了 NLB 的 IP 地址。NLB 会将所有流量转发给 Istio 网关,再由 Istio 网关对流量进行分发。

要实现 RAG,作为知识库的专有数据会被转换为 vector embeddings,同时加入索引并存储在内存中的 FAISS 向量数据库中,以便进行高效的相似性搜索。我们使用 Titan Embeddings 模型将文本数据转换为 vector embeddings,并使用 Claude Instant 生成对用户查询的响应。这两种模型均可通过 Amazon Bedrock 获取。这里的 RAG 是通过 FAISS 实现的,FAISS 是一个开源库,用于实现在内存中存储 vector embeddings 和提供快捷的相似性搜索。我们使用 FAISS 是因为它简单易用,占用空间较小,适合演示。但对于企业的正式环境,pgvector、Amazon Opensearch 以及其他一些 AWS 合作伙伴的产品才是首选,因为这些产品具有很好的扩展性,一致性和访问便捷的特性。这里采用了 Amazon Titan Embeddings 模型来生成 vector embeddings。LangChain 提供了针对大语言模型运行查询,以及通过 Amazon DynamoDB 来保存和管理聊天记录等功能。它可以协调处理用户输入过程中的各种步骤,包括聊天记录检索、将聊天记录和用户输入合并发送到 Embedding 模型进行 vector embeddings 的生成、使用接收到的 vector embeddings 去搜索向量数据库,以及将检索到的文档与聊天记录和用户输入一并发送到 Claude Instant 模型来获取输出。

实现细节

聊天机器人和 RAG 组件的容器化

聊天机器人应用由两个微服务组件组成,包括与用户交互的前端聊天界面和 RAG API。两个微服务都以 Docker 容器镜像的形式构建,以便在 Amazon EKS 上以 Kubernetes pod 的形式进行部署。

前端聊天界面应用的 Dockerfile 如下:

FROM public.ecr.aws/docker/library/python:3.11.4-slim AS installer-image
WORKDIR /app
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq install -y \
    build-essential \
    curl \
    software-properties-common 2>/dev/null >/dev/null \
    && rm -rf /var/lib/apt/lists/*
ADD app/* ./
RUN pip install --user --upgrade -q -q pip && pip install --user -q -q -r requirements.txt

FROM public.ecr.aws/docker/library/python:3.11.4-slim
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq upgrade -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt install -qq -y curl 2>/dev/null >/dev/null && \
    addgroup --gid 8000 streamlit && \
    adduser --uid 8000 --gid 8000 --disabled-password --gecos "" streamlit
USER streamlit
WORKDIR /home/streamlit/app
COPY --chown=streamlit:streamlit --from=installer-image /root/.local /home/streamlit/.local/
COPY --chown=streamlit:streamlit app/*.py /home/streamlit/app/
ENV PATH=/home/streamlit/.local/bin:$PATH
EXPOSE 8501
HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
ENTRYPOINT ["streamlit", "run", "webapp.py", "--server.port=8501", "--server.address=0.0.0.0"]

这里采用的基础镜像是从 Amazon ECR 公有的镜像仓库获取的一个轻量化的 Python 基础镜像,同时使用了多阶段构建的方式。在第一阶段,主要是将一些 Python 依赖包安装到用户的家目录中。在第二阶段,我们要创建一个普通用户和用户组来运行基于 Streamlit 的用户聊天界面,同时从第一阶段的镜像中复制已安装的 Python 包,并将应用代码和所有相关模块复制到容器镜像中的应用工作目录。

RAG API 应用的 Dockerfile 如下:

FROM public.ecr.aws/docker/library/python:3.11.4-slim AS installer-image
WORKDIR /app
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq install -y \
    build-essential \
    curl 2>/dev/null >/dev/null \
    && rm -rf /var/lib/apt/lists/*
ADD api/requirements.txt ./
RUN pip install --upgrade -q -q pip && \
    pip install --user --upgrade -q -q pip && pip install --user -q -q -r requirements.txt && \
    python -m pip install --user -q -q botocore && \
    python -m pip install --user -q -q boto3

FROM public.ecr.aws/docker/library/python:3.11.4-slim
RUN DEBIAN_FRONTEND=noninteractive apt-get -qq update -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt-get -qq upgrade -y 2>/dev/null >/dev/null && \
    DEBIAN_FRONTEND=noninteractive apt install -qq -y curl 2>/dev/null >/dev/null && \
    addgroup --gid 8000 ragapi && \
    adduser --uid 8000 --gid 8000 --disabled-password --gecos "" ragapi
USER ragapi
WORKDIR /home/ragapi/app
COPY --chown=ragapi:ragapi --from=installer-image /root/.local /home/ragapi/.local/
COPY --chown=ragapi:ragapi api/app /home/ragapi/app/
ENV PATH=/home/ragapi/.local/bin:$PATH
EXPOSE 8000
ENTRYPOINT ["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "main:app"]

对于 RAG API 应用,我们使用与前端相同的 Python 基础镜像,并遵循与前端应用类似的步骤。在提供 API 服务时,我们使用 FastAPI 框架和 Gunicorn/Uvicorn 组合来运行高效的 ASGI 网络服务。

容器镜像构建完成后,会被推送到 Amazon ECR 中等待部署。在 Amazon EKS 上部署聊天机器人应用时,所需的容器镜像会从 ECR 中获取。

如下图所示,每个租户都会有一个独立的 Kubernetes 命名空间,并根据每个租户的特定场景设置环境变量和部署聊天机器人应用。聊天机器人应用是由两个容器组成,主容器是前端,而 RAG API 则作为侧载容器,提供问答任务所需的 API。我们还为每个租户创建了一个 AWS 身份和访问管理角色(AWS IAM Role),该角色拥有租户指定的 Amazon S3 存储桶和 Amazon DynamoDB 表的读写权限。为了简单化,我们为每个租户创建了专用的 Amazon S3 存储桶 和 Amazon DynamoDB 表。将上述的 IAM Role 会和 Kubernetes Service Account 进行绑定之后,命名空间内的容器将会获得访问所需要数据的权限。

相关的 Kubernetes 部署文件如下:

apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::${ACCOUNT_ID}:role/${EKS_CLUSTER_NAME}-${TENANT}-chatbot-access-role-${RANDOM_STRING}
  name: ${SA_NAME}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chatbot
  labels:
    app: chatbot
spec:
  replicas: 1
  selector:
    matchLabels:
      app: chatbot
  template:
    metadata:
      labels:
        workload-tier: frontend
        app: chatbot
    spec:
      serviceAccountName: ${SA_NAME}
      containers:
        - image: ${REPO_URI_CHATBOT}:latest
          imagePullPolicy: Always
          name: chatbot
          ports:
            - containerPort: 8501
          env:
          - name: ISSUER_URI
            value: ${ISSUER_URI}
          - name: SESSIONS_TABLE
            value: ${SESSIONS_TABLE}
        - image: ${REPO_URI_RAGAPI}:latest
          imagePullPolicy: Always
          name: ragapi
          ports:
            - containerPort: 8000
          env:
          - name: CONTEXTUAL_DATA_BUCKET
            value: contextual-data-${TENANT}-${RANDOM_STRING}
          - name: CHATHISTORY_TABLE
            value: ${CHATHISTORY_TABLE}
          - name: TEXT2TEXT_MODEL_ID
            value: ${TEXT2TEXT_MODEL_ID}
          - name: EMBEDDING_MODEL_ID
            value: ${EMBEDDING_MODEL_ID}
          - name: BEDROCK_SERVICE
            value: ${BEDROCK_SERVICE}
          - name: AWS_DEFAULT_REGION
            value: ${AWS_DEFAULT_REGION}
---
kind: Service
apiVersion: v1
metadata:
  name: chatbot
  labels:
    app: chatbot
spec:
  selector:
    app: chatbot
  ports:
    - port: 80
      name: http
      targetPort: 8501

从上面的容器定义中可以看到,有几种不同的环境变量会传递给容器中的应用程序,应用程序使用这些变量来定位当前租户中特定的数据资源和 API 接口。值得注意的是,容器镜像的地址也是可以作为参数传递给部署文件的,在这个例子中,这些参数被设置为每个镜像的 ECR 地址。

使用 FAISS 和 Titan Embeddings 模型处理数据

作为部署的一部分,聊天机器人用于生成用户查询回复的上下文数据会被摄取到 FAISS 索引中。如下图所示,上下文数据首先从租户的 Amazon S3 存储桶中读取。

摄取机制使用 LangChain 的 CharacterTextSplitter 将摄取的数据分成合理大小的块,FAISS 库通过调用 Titan embeddings 模型将每个块转换成 vector embeddings,并为每一个 embeddings 创建一个索引。这种索引可缩短相似性搜索的响应时间。这样创建的索引集会以二进制格式文件输出,并上传回 Amazon S3 存储桶。RAG API 微服务会在初始化过程中访问该存储桶,并将其加载到内存中。对于较大型的向量数据库,索引由数据库实例创建和管理,应用程序可使用数据库 API 进行查询。专用的向量数据库可将上下文数据的加载、更新和创建索引与应用程序分离开来,也可将数据库实例的扩缩容与应用程序分离开来。

下面的代码是部份实现细节。

boto3_bedrock = boto3.client('bedrock', BEDROCK_REGION, endpoint_url=BEDROCK_ENDPOINT)
br_embeddings = BedrockEmbeddings(client=boto3_bedrock) 
... 

loader = CSVLoader(f"./{LOCAL_RAG_DIR}/{DATAFILE}") documents_aws = loader.load() 
print(f"documents:loaded:size={len(documents_aws)}") 

docs = CharacterTextSplitter(chunk_size=2000, chunk_overlap=400, separator=",").split_documents(documents_aws) 
... 
vector_db = FAISS.from_documents(documents=docs, embedding=br_embeddings,) 
... 
vector_db.save_local(f"{FAISS_INDEX_DIR}-{t}") 
try: 
    to_upload = os.listdir(f"./{FAISS_INDEX_DIR}-{t}") 
    for file in to_upload:
        s3.Bucket(S3_BUCKET).upload_file(f"./{FAISS_INDEX_DIR}-{t}/{file}", f"{FAISS_INDEX_DIR}/{file}", ) 
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404": print("The object does not exist.") 
        else: 
            raise

与 Claude Instant 集成

RAG API 微服务接收来自前端的用户查询,将其转换为对应基础模型所需要的提示格式,并使用 LangChain 与模型进行交互。LangChain 初始化后会在特定租户的 Amazon DynamoDB 表中维护聊天记录。RAG API 服务收到用户查询之后,会调用 LangChain 从向量数据库中检索相似性的结果,并将提示、搜索匹配的结果和之前的聊天记录作为上下文传给 Claude Instant 模型。从基础模型收到的响应后由 RAG API 通过聊天界面返回给用户,如下图所示。

当用户在浏览器输入访问地址(如 tenant.example.com),第一次访问聊天界面时,该请求将被 Istio 网关拦截,并转发到 OIDC 代理进行授权。由于请求中不包含与授权令牌相关联的 cookie,这会将用户的浏览器重定向到 Amazon Cognito 登陆界面。用户登录后,请求会收到来 Cognito 的代码,该代码会传递给 OIDC 代理,由 OIDC 代理与Cognito 交换用户的身份令牌(格式为 JSON 网络令牌 [JWT])。OIDC 代理缓存 JWT,并将 cookie 返回给用户浏览器,以便在今后的请求中使用。当请求到达租户的聊天机器人服务时,JWT 将根据其发行方 URL 进行验证,以确保 JWT 的真实性。此外,还要验证请求是否来自 Istio 网关。通过这些检查后,请求就会到达部署在租户命名空间中的聊天界面。

下面的代码显示在部署过程中我们为每个租户创建了两个 Amazon DynamoDB 表。Sessions 表记录用户会话,ChatHistory 表由 LangChain 用来存储和维护用户的聊天记录。

TENANTS="tenanta tenantb"

for t in $TENANTS
do
    export TABLE_NAME="Sessions_${t}_${RANDOM_STRING}"  
    echo "Creating DynamoDB table ${TABLE_NAME}"
    export DDB_TABLE=$(aws dynamodb create-table \
                        --table-name ${TABLE_NAME} \
                        --attribute-definitions \
                            AttributeName=TenantId,AttributeType=S \
                        --provisioned-throughput \
                            ReadCapacityUnits=5,WriteCapacityUnits=5 \
                        --key-schema \
                            AttributeName=TenantId,KeyType=HASH \
                        --table-class STANDARD
                        )
    
    export TABLE_NAME="ChatHistory_${t}_${RANDOM_STRING}"

    echo "Creating DynamoDB table ${TABLE_NAME}"
    export DDB_TABLE=$(aws dynamodb create-table \
                        --table-name ${TABLE_NAME} \
                        --attribute-definitions \
                            AttributeName=SessionId,AttributeType=S \
                        --provisioned-throughput \
                            ReadCapacityUnits=5,WriteCapacityUnits=5 \
                        --key-schema \
                            AttributeName=SessionId,KeyType=HASH \
                        --table-class STANDARD
                        )
done

Istio 代理容器会从身份验证 JWT 令牌中提取租户 ID 和用户电子邮件,并将它们分别作为 X-Auth- Request-Tenantid 和 X-Auth-Request-Email 请求头注入。聊天应用程序在接收用户输入时会创建与租户 ID 和用户电子邮件相关联的会话 ID。它还会记录上次用户交互的 UNIX 时间。SessionId 是第 4 版 UUID 字符串。由 TenantId:UserEmail:SessionId 组成的连接字符串将与用户查询一起发送给 RAG API,这样就可以将聊天记录与特定用户联系起来,为下游应用(如产品推荐、客户关系、技术支持等)提供丰富的上下文信息。

以下代码片段展示了部份处理过程。

...

headers = _get_websocket_headers() 
tenantid = headers.get("X-Auth-Request-Tenantid")
user_email = headers.get("X-Auth-Request-Email")

tenant_id = tenantid + ":" + user_email
dyn_resource = boto3.resource('dynamodb') 
IDLE_TIME = 600
current_time = int(datetime.now().timestamp())

...

if user_input:
    try:
        sessions = Sessions(dyn_resource)
        sessions_exists = sessions.exists(table_name)
        if sessions_exists:
            session = sessions.get_session(tenant_id)
            if session:
                if ((current_time - session['last_interaction']) < IDLE_TIME):
                    sessions.update_session_last_interaction(tenant_id, current_time)
                    updated_session = sessions.get_session(tenant_id)
                    print(updated_session['session_id'])
                else:
                    sessions.update_session(tenant_id, current_time)
                    updated_session = sessions.get_session(tenant_id)
            else:
                sessions.add_session(tenant_id)
                session = sessions.get_session(tenant_id)
    except Exception as e:
        print(f"Something went wrong: {e}")

    # headers for request and response encoding, same for both endpoints
    headers: Dict = {"accept": "application/json",
                     "Content-Type": "application/json"
                    }
    output: str = None
    if mode == MODE_RAG:
        user_session_id = tenant_id + ":" + session["session_id"]
        data = {"q": user_input, "user_session_id": user_session_id, "verbose": True}
        resp = req.post(api_rag_ep, headers=headers, json=data)
        if resp.status_code != HTTP_OK:
            output = resp.text
        else:
            resp = resp.json()
            sources = list(set([d['metadata']['source'] for d in resp['docs']]))
            output = f"{resp['answer']} \n \n Sources: {sources}"
    else:
        print("error")
        output = f"unhandled mode value={mode}"
    st.session_state.past.append(user_input)  
    st.session_state.generated.append(output) 

由于前端和 RAG API 在同一个 Kubernetes pod 中,前端可以直接通过 localhost (127.0.0.1) 与 RAG API 通信。当 RAG API 服务启动时,它会将 FAISS 索引加载到内存中,并开始监听连接。收到请求后,它会初始化包含凭证的 Bedrock boto3 客户端,并将请求数据、基础模型的参数和请求 Bedrock 的数据结构传给 LangChain。LangChain 的功能是将聊天对话保存在 Amazon DynamoDB 表中,并启动一个 ConversationalRetrievalChain,自动从 FAISS 索引中查找上下文,检索保存的历史记录,根据模板定义格式化查询,将上下文与格式化查询捆绑在一起,将其发送到 Claude Instant 模型,并保存模型的响应。RAG API 会将从 LangChain 收到的响应返回给前端应用程序,以便呈现给用户。

RAG API 管理的事件序列详见以下代码片段。

...

VECTOR_DB_DIR = os.path.join("/tmp", "_vectordb")
_vector_db = None
vectordb_s3_path: str = f"s3://{os.environ.get('CONTEXTUAL_DATA_BUCKET')}/faiss_index/"

if _vector_db is None:
    _vector_db = load_vector_db_faiss(vectordb_s3_path,
                                      VECTOR_DB_DIR,
                                      EMBEDDINGS_MODEL,
                                      BEDROCK_SERVICE)
...
    parameters = {
        "max_tokens_to_sample": req.maxTokenCount,
        "stop_sequences": req.stopSequences,
        "temperature": req.temperature,
        "top_k": req.topK,
        "top_p": req.topP
        }

    endpoint_name = req.text_generation_model
    logger.info(f"ModelId: {TEXT2TEXT_MODEL_ID}, Bedrock Model: {BEDROCK_SERVICE}")

    session_id = req.user_session_id
    boto3_bedrock = boto3.client(service_name=BEDROCK_SERVICE)
    bedrock_llm = Bedrock(model_id=TEXT2TEXT_MODEL_ID, client=boto3_bedrock)
    bedrock_llm.model_kwargs = parameters
    
    message_history = DynamoDBChatMessageHistory(table_name=CHATHISTORY_TABLE, session_id=session_id)
    memory_chain = ConversationBufferMemory(
        memory_key="chat_history",
        chat_memory=message_history,
        input_key="question",
        ai_prefix="Assistant",
        return_messages=True
    )
    
    condense_prompt_claude = PromptTemplate.from_template("""
    Answer only with the new question.
    
    Human: How would you ask the question considering the previous conversation: {question}
    
    Assistant: Question:""")

    qa = ConversationalRetrievalChain.from_llm(
        llm=bedrock_llm, 
        retriever=_vector_db.as_retriever(search_type='similarity', search_kwargs={"k": req.max_matching_docs}), 
        memory=memory_chain,
        condense_question_prompt=condense_prompt_claude,
        chain_type='stuff', # 'refine',
    )

    qa.combine_docs_chain.llm_chain.prompt = PromptTemplate.from_template("""
    {context}

    Human: Answer the question inside the <q></q> XML tags.
    
    <q>{question}</q>
    
    Do not use any XML tags in the answer. If you don't know the answer or if the answer is not in the context say "Sorry, I don't know."

    Assistant:""")

    answer = ""
    answer = qa.run({'question': req.q })

    logger.info(f"answer received from llm,\nquestion: \"{req.q}\"\nanswer: \"{answer}\"")
    resp = {'question': req.q, 'answer': answer, 'session_id': req.user_session_id}
    if req.verbose is True:
        resp['docs'] = docs

    return resp

结论

本文展示了如何使用 Amazon Bedrock 和 Amazon EKS 通过 RAG 构建一个多租户聊天机器人。RAG 实现了使用专有数据作为大语言模型的上下文,从而使大语言模型的响应更具预测性和相关性。虽然本文展示的是聊天机器人用例,但搭建方法和相关组件也可以应用于诸如问题解答、文本总结和内容创作等场景。
Amazon EKS 提供原生的多租户功能,对不同租户之间的工作负载进行隔离,从而实现高效的资源共享。我们鼓励您探索 Amazon Titan 模型和 Amazon Bedrock,用于您的生成式人工智能应用,并使用 Amazon EKS 作为底层协调平台,构建一个具有弹性和成本效益的解决方案。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

Original URL:https://thinkwithwp.com/blogs/containers/build-a-multi-tenant-chatbot-with-rag-using-amazon-bedrock-and-amazon-eks/

本篇作者

Farooq Ashraf

Farooq Ashraf is a Sr. Solutions Architect at AWS based in the San Francisco Bay Area, supporting customers in the ISV space. He has extensive experience in SaaS architectures, generative AI, and container-based microservices.

Jared Dean

Jared Dean is a Principal AI/ML Solutions Architect at AWS. Jared works with customers across industries to develop machine learning applications that improve efficiency. He is interested in all things AI, technology, and bbq.

Ravi Yadav

Ravi leads the go-to-market strategy for AWS Container Services, supporting ISV and digital native customers. Prior to AWS, Ravi has experience in product management, product/corporate strategy, and engineering across companies including Moody’s, Mesosphere, IBM, and Cerner.

校译作者

梁宇

亚马逊云科技专业服务团队 DevOps 顾问,主要负责 DevOps 技术实施。尤为热衷云原生服务及其相关技术。在工作之余,他喜欢运动,以及和家人一起旅游。