摄取机制使用 LangChain 的 CharacterTextSplitter 将摄取的数据分成合理大小的块,FAISS 库通过调用 Titan embeddings 模型将每个块转换成 vector embeddings,并为每一个 embeddings 创建一个索引。这种索引可缩短相似性搜索的响应时间。这样创建的索引集会以二进制格式文件输出,并上传回 Amazon S3 存储桶。RAG API 微服务会在初始化过程中访问该存储桶,并将其加载到内存中。对于较大型的向量数据库,索引由数据库实例创建和管理,应用程序可使用数据库 API 进行查询。专用的向量数据库可将上下文数据的加载、更新和创建索引与应用程序分离开来,也可将数据库实例的扩缩容与应用程序分离开来。
下面的代码是部份实现细节。
boto3_bedrock = boto3.client('bedrock', BEDROCK_REGION, endpoint_url=BEDROCK_ENDPOINT)
br_embeddings = BedrockEmbeddings(client=boto3_bedrock)
...
loader = CSVLoader(f"./{LOCAL_RAG_DIR}/{DATAFILE}") documents_aws = loader.load()
print(f"documents:loaded:size={len(documents_aws)}")
docs = CharacterTextSplitter(chunk_size=2000, chunk_overlap=400, separator=",").split_documents(documents_aws)
...
vector_db = FAISS.from_documents(documents=docs, embedding=br_embeddings,)
...
vector_db.save_local(f"{FAISS_INDEX_DIR}-{t}")
try:
to_upload = os.listdir(f"./{FAISS_INDEX_DIR}-{t}")
for file in to_upload:
s3.Bucket(S3_BUCKET).upload_file(f"./{FAISS_INDEX_DIR}-{t}/{file}", f"{FAISS_INDEX_DIR}/{file}", )
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404": print("The object does not exist.")
else:
raise
与 Claude Instant 集成
RAG API 微服务接收来自前端的用户查询,将其转换为对应基础模型所需要的提示格式,并使用 LangChain 与模型进行交互。LangChain 初始化后会在特定租户的 Amazon DynamoDB 表中维护聊天记录。RAG API 服务收到用户查询之后,会调用 LangChain 从向量数据库中检索相似性的结果,并将提示、搜索匹配的结果和之前的聊天记录作为上下文传给 Claude Instant 模型。从基础模型收到的响应后由 RAG API 通过聊天界面返回给用户,如下图所示。
...
headers = _get_websocket_headers()
tenantid = headers.get("X-Auth-Request-Tenantid")
user_email = headers.get("X-Auth-Request-Email")
tenant_id = tenantid + ":" + user_email
dyn_resource = boto3.resource('dynamodb')
IDLE_TIME = 600
current_time = int(datetime.now().timestamp())
...
if user_input:
try:
sessions = Sessions(dyn_resource)
sessions_exists = sessions.exists(table_name)
if sessions_exists:
session = sessions.get_session(tenant_id)
if session:
if ((current_time - session['last_interaction']) < IDLE_TIME):
sessions.update_session_last_interaction(tenant_id, current_time)
updated_session = sessions.get_session(tenant_id)
print(updated_session['session_id'])
else:
sessions.update_session(tenant_id, current_time)
updated_session = sessions.get_session(tenant_id)
else:
sessions.add_session(tenant_id)
session = sessions.get_session(tenant_id)
except Exception as e:
print(f"Something went wrong: {e}")
# headers for request and response encoding, same for both endpoints
headers: Dict = {"accept": "application/json",
"Content-Type": "application/json"
}
output: str = None
if mode == MODE_RAG:
user_session_id = tenant_id + ":" + session["session_id"]
data = {"q": user_input, "user_session_id": user_session_id, "verbose": True}
resp = req.post(api_rag_ep, headers=headers, json=data)
if resp.status_code != HTTP_OK:
output = resp.text
else:
resp = resp.json()
sources = list(set([d['metadata']['source'] for d in resp['docs']]))
output = f"{resp['answer']} \n \n Sources: {sources}"
else:
print("error")
output = f"unhandled mode value={mode}"
st.session_state.past.append(user_input)
st.session_state.generated.append(output)
由于前端和 RAG API 在同一个 Kubernetes pod 中,前端可以直接通过 localhost (127.0.0.1) 与 RAG API 通信。当 RAG API 服务启动时,它会将 FAISS 索引加载到内存中,并开始监听连接。收到请求后,它会初始化包含凭证的 Bedrock boto3 客户端,并将请求数据、基础模型的参数和请求 Bedrock 的数据结构传给 LangChain。LangChain 的功能是将聊天对话保存在 Amazon DynamoDB 表中,并启动一个 ConversationalRetrievalChain,自动从 FAISS 索引中查找上下文,检索保存的历史记录,根据模板定义格式化查询,将上下文与格式化查询捆绑在一起,将其发送到 Claude Instant 模型,并保存模型的响应。RAG API 会将从 LangChain 收到的响应返回给前端应用程序,以便呈现给用户。
RAG API 管理的事件序列详见以下代码片段。
...
VECTOR_DB_DIR = os.path.join("/tmp", "_vectordb")
_vector_db = None
vectordb_s3_path: str = f"s3://{os.environ.get('CONTEXTUAL_DATA_BUCKET')}/faiss_index/"
if _vector_db is None:
_vector_db = load_vector_db_faiss(vectordb_s3_path,
VECTOR_DB_DIR,
EMBEDDINGS_MODEL,
BEDROCK_SERVICE)
...
parameters = {
"max_tokens_to_sample": req.maxTokenCount,
"stop_sequences": req.stopSequences,
"temperature": req.temperature,
"top_k": req.topK,
"top_p": req.topP
}
endpoint_name = req.text_generation_model
logger.info(f"ModelId: {TEXT2TEXT_MODEL_ID}, Bedrock Model: {BEDROCK_SERVICE}")
session_id = req.user_session_id
boto3_bedrock = boto3.client(service_name=BEDROCK_SERVICE)
bedrock_llm = Bedrock(model_id=TEXT2TEXT_MODEL_ID, client=boto3_bedrock)
bedrock_llm.model_kwargs = parameters
message_history = DynamoDBChatMessageHistory(table_name=CHATHISTORY_TABLE, session_id=session_id)
memory_chain = ConversationBufferMemory(
memory_key="chat_history",
chat_memory=message_history,
input_key="question",
ai_prefix="Assistant",
return_messages=True
)
condense_prompt_claude = PromptTemplate.from_template("""
Answer only with the new question.
Human: How would you ask the question considering the previous conversation: {question}
Assistant: Question:""")
qa = ConversationalRetrievalChain.from_llm(
llm=bedrock_llm,
retriever=_vector_db.as_retriever(search_type='similarity', search_kwargs={"k": req.max_matching_docs}),
memory=memory_chain,
condense_question_prompt=condense_prompt_claude,
chain_type='stuff', # 'refine',
)
qa.combine_docs_chain.llm_chain.prompt = PromptTemplate.from_template("""
{context}
Human: Answer the question inside the <q></q> XML tags.
<q>{question}</q>
Do not use any XML tags in the answer. If you don't know the answer or if the answer is not in the context say "Sorry, I don't know."
Assistant:""")
answer = ""
answer = qa.run({'question': req.q })
logger.info(f"answer received from llm,\nquestion: \"{req.q}\"\nanswer: \"{answer}\"")
resp = {'question': req.q, 'answer': answer, 'session_id': req.user_session_id}
if req.verbose is True:
resp['docs'] = docs
return resp