大语言模型已经被证明可以在其参数中存储知识,并生成文本或者回答问题,然而,它们操作知识的能力仍然有限,特别是一个具体的特定领域,他们的回答可能变得不那么“自信”,还会产生幻觉(hallucination)。RAG(Retrieval Augmented Generation)检索增强生成,是一种通用采用检索方式增强生成(RAG)的 AI 框架,即大模型 LLM 在回答问题或生成文本时,会先从大量的文档中检索出相关信息,然后基于这些检索出的信息进行回答或生成文本,从而可以提高回答的质量,而不是任由 LLM 来发挥。
在此博客里,我们将构建一个基于 RAG 的 LLM 应用程序,使用 LLM 和 Embedding 的向量生成模型进行语义搜索与文本相似性,来构建一个带有图形界面的问答式文档 Web 应用程序。我们将模拟一个工业生产线故障诊断的搜索,并根据搜索给出智能分析方案。由于大模型基本上都支持 Streaming 的方式生成内容,这样用户不用等待所有结果输出结束就可以看到持续生成的内容。我们这里使用 AWS 基础设施,来构建一个基于 HTTP Streaming 的 LLM 应用。
我们在该方案主要使用到如下服务和组件:
基于以上服务和基础设施,我们采用 ASG+EC2 部署 LLM 用来生成高质量的回答,使用 Amazon OpenSearch 作为向量相似度检索来进行语义搜索,并在 Amazon SageMaker 上部署一个用于生成文本向量的模型。
方案综述
该方案主要包含以下两个阶段:
- 知识准备阶段 – 在此阶段我们通过 Embedding 向量模型对一组语料文档生成向量表示,我们主要根据问题生成向量,并将其存入 OpenSearch Service 中的一个 k-NN 索引中。
- 知识检索增强阶段 – 这是应用程序的推理阶段。在此阶段,我们将对输入的文本采用准备阶段使用的向量模型生成向量,然后通过 k-NN 索引来搜索相似的问题并返回相似的列表给前端,在前端页面可以选择你需要大模型进行分析的项目列表,传入后台大模型进行总结。
知识准备阶段可以参考知识导入的过程,这里不做赘述,知识检索增强阶段的方案如下图所示:
具体的步骤如下:
- 在前端 UI 页面上输入关键字查询相关的问题和答案,UI 层将内容发送到服务端(Lambda 等服务);
- 使用 Embedding 向量模型把关键字转成向量表达;
- 通过关键的向量在 Amazon OpenSearch 的向量库中进行 k-NN 搜索与其相似的问题;
- 返回向量库相关搜索结果给前端页面;
- 用户在页面上手动选择相关信息,UI 层发送选中信息至后端 LLM 大模型;
- 通过 LLM 生成问题分析和解决方案建议;
- 开始 Streaming 逐字输出文本;
- 在前端 UI 中逐字渲染 Streaming 的文本。
我们会在接下来的章节主要介绍与之相关的技术方案。
- 部署文字向量模型:我们采用 SageMaker 部署 Embedding 模型,用于根据文字生成向量,继而可以用于 OpenSearch 服务的 Vector 相似性查询;
- 创建 LLM 应用:我们将介绍一个 Hugging Face 的大模型,并使用 Flask 构建 Web 应用,用来智能分析;
- 使用 OpenSearch 进行语义搜索:我们将介绍相关使用语义进行向量搜索;
- 导入数据:导入我们生成的模拟 QA 的数据;
- 示例 UI 的实现和部署
使用 CDK 部署生成文字向量模型
使用 SageMaker 能够把适用于自然语言处理(NLP)的 Hugging Face 模型进行训练、微调和运行推理。其将使用 AWS Deep Learning Containers 去训练和推理工作。有关可用的 Deep Learning Containers 图像的列表,请参阅可用的深度学习容器。这些 Deep Learning Containers 镜像经过维护,并定期使用安全补丁进行更新。
这里我们采用 AWS CDK 来部署一个 Hugging Face 的向量模型 shibing624/text2vec-base-chinese
。该模型是一个 CoSENT(Cosine Sentence)的预训练模型,如下片段用来生成我们的 SageMaker 推理节点:
model = sagemaker.CfnModel(
self,
f"Model",
execution_role_arn=self._sagemaker_role.role_arn,
# the properties below are optional
enable_network_isolation=False,
containers=[
sagemaker.CfnModel.ContainerDefinitionProperty(
container_hostname=f"{self._project_name}ContainerHostname",
image=image_uri,
mode="SingleModel",
environment={
"HF_TASK": "feature-extraction",
"HF_MODEL_ID": "shibing624/text2vec-base-chinese",
"SAGEMAKER_CONTAINER_LOG_LEVEL": 20,
"SAGEMAKER_REGION": cdk.Aws.REGION,
},
)
],
)
...
endpoint_config = sagemaker.CfnEndpointConfig(
self,
f"EPConfig",
production_variants=[
sagemaker.CfnEndpointConfig.ProductionVariantProperty(
initial_instance_count=1,
initial_variant_weight=1.0,
instance_type=instance_type,
model_name=model.attr_model_name,
variant_name=variant_name,
)
],
)
endpoint_config.add_dependency(model)
# ==============================
# ===== SAGEMAKER ENDPOINT =====
# ==============================
endpoint = sagemaker.CfnEndpoint(
self,
f"{self._project_name}Endpoint",
endpoint_name=f'{self._project_name}Endpoint',
endpoint_config_name=endpoint_config.attr_endpoint_config_name,
)
...
创建 LLM 应用
我们将使用具有 GPU 功能的 EC2 来运行我们的 LLM 应用。在大模型选择上,我们选择了 ChatGLM2-6B,ChatGLM2-6B 是开源中英双语对话模型 ChatGLM-6B 的第二代版本,它具备多领域知识、代码能力、常识推理及运用能力。其网络架构基于 Prefix Encoder 方式,推理速度较第一代有 40% 的提升,特别是 RoPE 位置编码也放在了 Attention Layer 之外(第一代放置在 Attention Layer 里)。
ChatGLM 模型(ChatGLM 和 ChatGLM2)主要使用了 Encoder 方式的网络结构,请参见论文 GLM: General Language Model Pretraining with Autoregressive Blank Infilling,我们这里简单介绍一下该 Encoder 的工作原理,如下图表示:
该 Encoder 方式在基础模型阶段和微调阶段是如下工作的。
输入序列 x 被分成两部分:Part A 是损坏的文本 Xcorrupt,Part B 由 Masked Span 组成,如上图。Part A 中的标记可以相互关注,但不能关注 B 中的任何标记。Part B 中的标记可以关注 Part A 和 B 中先前的标记,但不能关注 B 中任何后续的标记。为了实现自动回归生成,每个 Span 都用特殊标记[START]和[END]进行填充,分别用于输入和输出。采用这种方式,该模型训练了一个双向编码器(Part A)(bidirectional encoder)和单向解码器(unidirectional)(Part B)。
对于文本生成任务,给定的上下文构成输入的 Part A,末尾添加一个 Mask Token,那么模型对 Part B 可以使用自回归方式生成文本。
大模型显存优化
由于大模型参数非常大,为了良好地使用,我们进行了一些优化。这个模型是 6B 的大小,其参数单位为 float16
,ml.g4dn 系列至少有 16G 的显存,当全部参数 Load 至 GPU 时,大概占用了 12.5G 左右,考虑到推理时需要更多的现存,我们引入了 Accelerate 包。Accelerate 是 Hugging Face 中非常有用的一个工具,可以大幅提高深度学习模型的训练速度和推理处理。它把模型加载至不同的设备上(内存、显存和 Disk)不需要全部加载至显存,这样就不需要太大的显存来存储模型,在运行其增加了一些 Hooks,这样:
- 在每神经网络层,输入被放置在正确的设备上(因此即使模型跨越几个 GPU,它也可以工作);
- 对于卸载(Offload)在 CPU 上的模型参数,它们在前向传播之前被放置在 GPU 上,并在之后很快被清理;
- 对于卸载(Offload)在硬盘上的模型参数,它们被加载到 RAM 中,然后在前向传播之前被放置在 GPU 上,并在之后很快被清理。
以下图示说明了 Hooks 的工作原理:
详情请参阅:https://huggingface.co/docs/accelerate/concept_guides/big_model_inference。
以下是我们运行时一个设备快照:
{
'transformer.embedding': 0,
'transformer.rotary_pos_emb': 0,
'transformer.encoder.layers.0': 0,
'transformer.encoder.layers.1': 0,
'transformer.encoder.layers.2': 0,
'transformer.encoder.layers.3': 0,
'transformer.encoder.layers.4.input_layernorm': 0,
'transformer.encoder.layers.4.self_attention.query_key_value': 0,
'transformer.encoder.layers.4.self_attention.core_attention': 0,
'transformer.encoder.layers.4.self_attention.dense': 'cpu',
'transformer.encoder.layers.4.post_attention_layernorm': 'cpu',
'transformer.encoder.layers.4.mlp': 'cpu’,
…
'transformer.encoder.layers.27': 'cpu',
'transformer.encoder.final_layernorm': 'cpu',
'transformer.output_layer': 'cpu'
}
我们下载 Hugging Face 的模型文件之后,在包含有 Accelerate lib 的环境中,加载模型的代码片段如下:
tokenizer = AutoTokenizer.from_pretrained("chatglm-6b", trust_remote_code=True)
model = AutoModel.from_pretrained("chatglm-6b", device_map='auto', trust_remote_code=True)
ChatGLM 支持 streaming 的方式生成文本,代码片段如下:
def summarize_generate():
history = []
response = ""
pre_response = None
for idx, (response, history) in enumerate(model.stream_chat(tokenizer, stream_input_text, temperature=temperature, history=history, max_length=6000)):
if pre_response is not None:
word = response[len(pre_response):]
pre_response = response
yield word.encode('utf-8')
else:
pre_response = response
yield response.encode('utf-8')
return app.response_class(summarize_generate())
另外,在我们一个关于知识库的案例中,我们发现如下超参数会影响生成文本的质量,各个参数的相关定义如下:
|
A |
说明 |
ChatGLM 是否支持 |
默认值 |
1 |
Temperature |
该参数决定了输出的概率分布。值越高,输出的预测越随机,创造性也越高 |
√ |
0.8 |
2 |
Top P |
Top P 采样过滤低概率的词汇(根据概率) |
√ |
0.8 |
3 |
Top K |
Top K 采样过滤低概率的词汇(根据数量) |
x |
N/A |
4 |
Sample |
是否采样增加随机性 |
√ |
TRUE |
我们的 Prompt 模版如下:
prompt_template_llm = """As a sophisticated expert in Manufacturing, based on the customer's question ```{question}```, please provide a concise and professional analysis based on the known information and answers provided answers by the following triple backquotes (```) and tell what information the answer is based on. If you cannot give an analysis from it, please say "insufficient information provided". It is not allowed to add hallucinations to the analysis. Please note that the analysis must be provided in English.
Known information and answers:
```
{answers}
```
"""
语义搜索
为了在海量数据上进行相似性搜索,我们根据向量模型生成的结果在已存储的 OpenSearch 的向量 Index 上进行搜索,第一步我们得到向量:
...
self._client = boto3_session.client(service_name="sagemaker-runtime")
...
## invoke with boto3
def _invoke(self, json_body):
try:
response = self._client.invoke_endpoint(EndpointName=self._endpoint_name,
ContentType='application/json',
Body=json_body,
Accept='application/json')
return response
except Exception as e:
logger.exception(
f"Invoke sagemaker endpoint {self._endpoint_name} by {json_body} has exception: {json_body}, by endpoint {self._endpoint_name}")
raise e
def generate_vectors(self, keywords: list[str]):
"""
Send request to sagemaker endpoint to generate labels
Args:
:keywords: keywords list to generate embeddings
"""
json_input = json.dumps({'inputs': keywords, "options": {"wait_for_model": True}})
logger.debug(f"Generate embedding from sagemaker {self._endpoint_name} by {json_input}")
response = self._invoke(json_body=json_input)
try:
vectors = json.loads(response['Body'].read())
if not isinstance(vectors, list) or len(vectors) == 0:
logger.warning(
f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has unsuccessful result: {vectors}")
return []
if len(vectors) != len(keywords):
logger.warning(
f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has umatched output witu input, vectors len {len(vectors)} != keywords len {len(keywords)}")
return []
results = [vector[0][0] for vector in vectors]
logger.debug(f"Generated embedding from sagemaker {self._endpoint_name} by {json_input}: {results}")
return results
except Exception as e:
logger.exception(
f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has exception: {json_input}")
raise e
...
第二步,我们进行向量搜索:
## generate query
@staticmethod
def _get_query(vector=[], size_output=5, knn_k=6):
query = {
"size": size_output,
"from": 0,
"_source": {
"excludes": ["question_vector"]
},
"query": {
"knn": {
"question_vector": {
"vector": vector,
"k": min(knn_k, 256)
}
}
}
}
return query
## opensearch client
def _create_opensearch_client(self,
boto3_session=None):
...
auth = (username, password) # For testing only. Don't store credentials in code.
return OpenSearch(hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
ssl_assert_hostname=False,
ssl_show_warn=False)
## query by OpenSearch client
def knn_search_by_text_vectors(self,
text_vector,
knn_k=6,
size_output=8):
"""
Search by vectors
Args:
:text_vector: text vector. must not null or empty
:size_output: max output size
:knn_k: param k of knn
:min_confidence: The minimum confidence level for the labels to return
"""
if text_vector is None or len(text_vector) == 0:
raise ValueError('Text vectors cannot be null or empty')
query = OpenSearchClient._get_query(vector=text_vector,
size_output=size_output,
knn_k=knn_k)
try:
logger.debug(
f"Querying answers from index {self._index} by vector with length {len(text_vector)}")
response = self._client.search(request_timeout=self._request_timeout,
index=self._index,
body=query)
logger.debug(f"Queried answers from open search index {self._index} by {query}: {response}")
return self._resolve_result(response)
except Exception as e:
logger.exception(
f"Couldn't query image materials from open search index {self._index} by {query}")
raise e
导入数据
我们采用大模型生成了一些 sample 数据进行测试,样本数据如下。
我们在 OpenSearch 创建了如下 index:
import boto3, json
import requests
....
awsauth = get_auth()
host = get_host()
# create index
index_name = 'semantic_search_knowledge_index'
v_dimension = 768 # Embbeding vector dimension
headers = { "Content-Type": "application/json" }
payloads = {
"settings": {
"index.knn": True,
"knn.space_type": "l2"
},
"mappings": {
"properties": {
"question_vector": {
"type": "knn_vector",
"dimension": v_dimension,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "nmslib",
"parameters": {
"ef_construction": 256,
"m": 32
}
}
},
"question": {
"type": "text"
},
"answers": {
"type": "text"
}
}
}
}
# Create Index
r = requests.put(host+index_name, auth=awsauth, headers=headers, json=payloads)
以下代码片段是用于导入数据的:
def import_single_row(payload):
## invoke SageMaker to generate text embedding
question_vector = generate_vector(payload['question'])
payload['question_vector'] = question_vector
first = json.dumps({ "index": { "_index": index_name} }, ensure_ascii=False) + "\n"
second = json.dumps(payload, ensure_ascii=False) + "\n"
payloads = first + second
r = requests.post(url, auth=awsauth, headers=headers, data=payloads.encode()) # requests.get, post, and delete have similar syntax
def import_data(json_array):
for payload in tqdm(json_array):
import_single_row(payload)
sleep(0.01)
json_array=[]
with open('qa_samples.csv', encoding = 'utf-8') as csv_file_handler:
csv_reader = csv.DictReader(csv_file_handler)
for row in csv_reader:
json_array.append(row)
import_data(json_array)
示例 UI 实现
UI 部署架构图
UI 层为 React 单页面应用,静态资源部署在 Amazon S3 中,通过 Amazon CloudFront 访问 S3 中的资源。
其中,为了确保 S3 Bucket 的安全性,仅允许指定的 CloudFront 访问。用户的浏览器通过 Amazon CloudFront 访问 Amazon S3 中的静态资源。
UI 页面展示
结果返回(http 请求流式输出)
UI 框架:@cloudscape(Apache License 2.0)Cloudscape was built for and is used by Amazon Web Services(AWS) products and services. 具体请参阅 AppLayout、Table、Alert 组件。
流式输出实现
const [answerMsg, setAnswerMsg] = useState("");
const requestApi = 'solution_backend_url'
const postData = { answers: requestAnswer, question: query };
try {
const response = await fetch(requestApi, {
method: "POST",
headers: HEADERS,
body: JSON.stringify(postData),
signal,
});
const data = response.body;
const reader = data.getReader();
const decoder = new TextDecoder();
let done = false;
let text = "";
// 循环等待结束标识 并将拿到的数据上屏渲染
while (!done) {
if (stopConversationRef.current === true) {
controller.abort();
done = true;
break;
}
const { value, done: doneReading } = await reader.read();
done = doneReading;
const chunkValue = decoder.decode(value);
text += chunkValue;
setAnswerMsg(text + "_");
}
setAnswerMsg(text);
} catch (error) {
console.error("doSummarizeError", error);
} finally {
}
......
<div>
{answerMsg && (
<Alert
key="alt-answer-msg"
statusIconAriaLabel="Info"
header="Analyze result"
dismissible
onDismiss={alertDismiss}
>
{answerMsg}
</Alert>
)}
</div>
通过 CDK 部署示例 UI
这里的逻辑为:
- 执行本地
npm i & npm run build
命令,安装依赖并对代码进行打包构建。
- 创建 Web S3 存储桶,设置 SPA 应用静态页面存储类型。
- 将打包产物上传 Bucket。
- 创建 CloudFront,设置回源到 S3 存储桶,并设置 S3 存储桶权限允许 CloudFront 回源访问。
代码示例:
# prepare the static web pages
self._prepare_static_web_pages(main_api, summarize_api)
# create website bucket
website_bucket = self._create_and_upload_asset_to_s3(id)
# Create cloudfront
distribution = self._create_cloudfront_distribution(website_bucket)
self._grant_cloudfront_access(distribution, website_bucket)
cdk.CfnOutput(
self,
"SmartSearchUrl",
value=f"https://{distribution.distribution_domain_name}",
description="Smart search url")
......
def _create_cloudfront_distribution(self, website_bucket):
distribution: cloudfront.CloudFrontWebDistribution = cloudfront.CloudFrontWebDistribution(
self,
"StaticWebsiteDistribution",
origin_configs=......,
default_root_object="index.html",
)
# add access control for frontend
cfn_origin_access_control = cloudfront.CfnOriginAccessControl(
self,
id="StaticWebsiteDistributionControl",
origin_access_control_config=cloudfront.CfnOriginAccessControl.OriginAccessControlConfigProperty(
name="StaticWebsiteDistributionControlConfig",
origin_access_control_origin_type="s3",
signing_behavior="always",
signing_protocol="sigv4",
description="Default Origin Access Control",
),
)
# add access
overriden_distribution = typing.cast(
cloudfront.CfnDistribution,
distribution.node.default_child,
)
overriden_distribution.add_property_override(
"DistributionConfig.Origins.0.OriginAccessControlId",
cfn_origin_access_control.get_att("Id"),
)
return distribution
......
Summary
本文,我们演示了如何使用 AWS 的技术和服务构建一个基于 RAG 的知识库。我们将知识数据以向量形式存储在数据库中,通过 K-nn 检索可以过滤相关的知识,然后交给大模型进行分析。其中,
- 大模型和生成向量的模型你可以根据需要进行选择;
- 在一张 Index 里,可以存储多个 Vector 向量,组合其这些 Vector 向量字段的权重给出过滤的知识,可以让您的知识更加丰富;
- 通过 Web UI 页面提供可视化的 K-nn 检索功能,并通过 UI 页面实现使用者选择结果与大模型分析的交互、流式返回。
- 通过 AWS 开源的 Cloudscape UI 组件提供 AWS 风格的交互页面。
该方案更加详细的部署您可以参考 Workshop:Question Answering via RAG and LLM,代码是开源的,请参考:smartsearch-ai-knowledge-workshop。
参考
RAG:https://research.facebook.com/publications/retrieval-augmented-generation-for-knowledge-intensive-nlp-tasks/
ChatGLM:https://github.com/THUDM/ChatGLM-6B/tree/main
ChatGLM2:https://github.com/THUDM/ChatGLM2-6B
Hugging Face Accelerate:https://github.com/huggingface/accelerate
React:https://react.dev/learn
Cloudscape:https://cloudscape.design/components/
本篇作者
内容审阅