基于 Amazon Bedrock 构建智能电商搜索推荐应用
利用 DynamoDB Zero-ETL 技术进行业务数据的准实时向量化,将其传输至 OpenSearch。同时结合 Bedrock 嵌入式模型和 LLM 模型进行产品推荐以及对客户评论的情感分析。



内容概览
Workshop 架构预览
利用 DynamoDB Zero-ETL 技术进行业务数据的准实时向量化,将其传输至 OpenSearch。同时结合 Bedrock 嵌入式模型和 LLM 模型进行产品推荐以及对客户评论的情感分析。

Workshop 效果预览
- 商品搜索推荐

- 商品评论

- 商品评论分析

基础环境准备
创建 Worksho 基础组件 (预计完成时间 20 分钟)
创建 CloudFormation 堆栈
此堆栈会自动创建相关资源包括 S3 桶、Dynamodb 表、Opensearch 数据库、lambda 函数、Apigateway 等。
1. 在 AWS 控制台搜索 CloudFormation 服务

2. 创建新的堆栈,上传部署模板 https://github.com/SEZ9/ShopAnalytica/blob/main/deploy.cfn.yaml

3. 点击下一步到提交页面,勾选确认选项,提交模板进行部署


申请 bedrock model
- 申请 bedrock model 权限 ,本实验使用 titan emebed 及 claude 2。

创建 Cognito 用户,用于登录 Opensearch Dashboard



3. 编辑用户名和密码,完成用户创建(密码为任意 8 位以上字符,数字字母皆可)

数据导入
导入商品信息表 (预计完成时间 5 分钟)
1. 下载 dynamodb json 文件 https://github.com/SEZ9/ShopAnalytica/tree/main/dynamodb-tables-data ,并上传至 S3 桶,桶名称见 cloudformation 堆栈输出

2. 进入 Dynamodb 服务,创建 S3 导入表任务,分别导入对应的中英文商品信息表。
其中文件 ProductDetails 文件对应表名 ProductDetails,主键名称 ProductID
文件 ProductDetailsCN 文件对应表名 ProductDetailsCN,主键名称 ProductID




3. 完成导入后,dynamodb 可见 4 张表,分别为中英文的商品信息表与商品评论表。(其中,中英文的商品评价表由 cloudformation 脚本自动创建)

创建数据管道
- (完成数据管道创建预计完成时间 25 分钟)
权限准备
编辑 Opensearch 安全配置,授权 IAM role。 在 AWS 控制台搜索 Opensearch 服务,进入创建好的 domain 中,点击登录 Dashboard UI。使用环境准备环节中创建的用户进行登录。

将 cloudformation 创建后输出的 3 个 role 的 ARN 全部附加到 opensearch 角色映射关系中。用于外部服务和 opensearch 的互相调用 (包括 lambda、dynamodb ETL、 bedrock 等)



注册 Opensearch embedding 数据管道配置
1. 注册 opensearch ML model 进入 Opensearch 的 Dev Tools 中,在左侧 console 依次执行以下步骤。


# 1. 创建bedrock ml connector (注意修改{}内容,并记录请求返回的 connect id)
POST /_plugins/_ml/connectors/_create
{
"name": "Amazon Bedrock Connector: embedding",
"description": "The connector to bedrock Titan embedding model",
"version": 1,
"protocol": "aws_sigv4",
"parameters": {
"region": "{region}", // 注意修改region编码
"service_name": "bedrock"
},
"credential": {
"roleArn": "{your role }" // 注意修改为 dynamodb etl role arn
},
"actions": [
{
"action_type": "predict",
"method": "POST",
"url": "https://bedrock-runtime.{region}.amazonaws.com/model/amazon.titan-embed-text-v1/invoke",
"headers": {
"content-type": "application/json",
"x-amz-content-sha256": "required"
},
"request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
"pre_process_function": "\n StringBuilder builder = new StringBuilder();\n builder.append(\"\\\"\");\n String first = params.text_docs[0];\n builder.append(first);\n builder.append(\"\\\"\");\n def parameters = \"{\" +\"\\\"inputText\\\":\" + builder + \"}\";\n return \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
"post_process_function": "\n def name = \"sentence_embedding\";\n def dataType = \"FLOAT32\";\n if (params.embedding == null || params.embedding.length == 0) {\n return params.message;\n }\n def shape = [params.embedding.length];\n def json = \"{\" +\n \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n \"\\\"shape\\\":\" + shape + \",\" +\n \"\\\"data\\\":\" + params.embedding +\n \"}\";\n return json;\n "
}
]
}
# 2. 注册模型 (注意修改{}内容,并记录model id)
// 创建模型组
POST /_plugins/_ml/model_groups/_register
{
"name": "remote_model_group",
"description": "This is an example description"
}
// 注册模型
POST /_plugins/_ml/models/_register
{
"name": "Bedrock embedding model",
"function_name": "remote",
"model_group_id": "", // 替换为上一步执行后返回的 group id
"description": "embedding model",
"connector_id": "" // 替换为注册的 connector id
}
// 部署模型
POST /_plugins/_ml/models/{mode_id}/_deploy // 替换为注册的 model id
// 验证模型
POST /_plugins/_ml/models/{mode_id}/_predict // 替换为注册的 model id
{
"parameters": {
"inputText": "What is the meaning of life?"
}
}
// 验证成功会返回 1536 维的向量编码
创建表映射pipeline
# 英文商品表pipeline
PUT /_ingest/pipeline/product-en-nlp-ingest-pipeline
{
"description": "A text embedding pipeline",
"processors": [
{
"script": {
"source": """
def combined_field = "ProductID: " + ctx.ProductID + ", Description: " + ctx.Description + ", ProductName: " + ctx.ProductName + ", Category: " + ctx.Category;
ctx.combined_field = combined_field;
"""
}
},
{
"text_embedding": {
"model_id": "{model id}", // 替换为注册的 model id
"field_map": {
"combined_field": "product_embedding"
}
}
}
]
}
# 中文商品表pipeline
PUT /_ingest/pipeline/product-cn-nlp-ingest-pipeline
{
"description": "A text embedding pipeline",
"processors": [
{
"script": {
"source": """
def combined_field = "商品编号: " + ctx.ProductID + ", 商品详情: " + ctx.Description + ", 商品名称: " + ctx.ProductName + ", 商品分类: " + ctx.Category;
ctx.combined_field = combined_field;
"""
}
},
{
"text_embedding": {
"model_id": "{model_id}", // 替换为注册的 model id
"field_map": {
"combined_field": "product_embedding"
}
}
}
]
}
# 英文商品评论表pipeline
PUT /_ingest/pipeline/product-reviews-nlp-ingest-pipeline
{
"description": "A text embedding pipeline",
"processors": [
{
"script": {
"source": """
def combined_field = "ProductID: " + ctx.ProductID + ", ProductName: " + ctx.ProductName + ", Comment: " + ctx.Comment + ", Timestamp: " + ctx.Timestamp;
ctx.combined_field = combined_field;
"""
}
},
{
"text_embedding": {
"model_id": "{model_id}", // 替换为注册的 model id
"field_map": {
"combined_field": "product_reviews_embedding"
}
}
}
]
}
# 中文商品评论表pipeline
PUT /_ingest/pipeline/product-cn-reviews-nlp-ingest-pipeline
{
"description": "A text embedding pipeline",
"processors": [
{
"script": {
"source": """
def combined_field = "商品编号: " + ctx.ProductID + ", 商品名称: " + ctx.ProductName + ", 商品评价: " + ctx.Comment + ", 时间: " + ctx.Timestamp;
ctx.combined_field = combined_field;
"""
}
},
{
"text_embedding": {
"model_id": "{model id}", // 替换为注册的 model id
"field_map": {
"combined_field": "product_reviews_embedding"
}
}
}
]
}
创建 dynamodb 数据管道
进入 Dynamodb console 控制台的集成选项,点击对应的表创建管道。

- 参考文档链接 dynamodb zero-ETL
1. ProductDetails 表(注意替换{}内容)
- 其中,{account id}为账户 id,在控制台左上角可以找到;{s3_bucket name}为 cloudformation 创建的桶,这里仅需填写桶名称
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:{region}:{account id}:table/ProductDetails"
stream:
start_position: "LATEST"
export:
s3_bucket: "{s3_bucket name}"
s3_region: "{region}"
s3_prefix: "ddb-to-opensearch-export-product-details-index-en/"
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
sink:
- opensearch:
hosts:
[
"{opensearch endpoint uri}"
]
index: "product-details-index-en"
index_type: custom
template_type: "index-template"
template_content: |
{
"template": {
"settings": {
"index.knn": true,
"default_pipeline": "product-en-nlp-ingest-pipeline"
},
"mappings": {
"properties": {
"ProductID": {
"type": "keyword"
},
"ProductName": {
"type": "text"
},
"Category": {
"type": "text"
},
"Description": {
"type": "text"
},
"Image": {
"type": "text"
},
"combined_field": {
"type": "text"
},
"product_embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "nmslib",
"name": "hnsw",
"space_type": "l2"
}
}
}
}
}
}
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
2. ProductDetailsCN 表(注意替换{}内容)
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:{region}:{account id}:table/ProductDetailsCN"
stream:
start_position: "LATEST"
export:
s3_bucket: "{s3_bucket name}"
s3_region: "{region}"
s3_prefix: "ddb-to-opensearch-export-product-details-index-cn/"
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
sink:
- opensearch:
hosts:
[
"{opensearch endpoint uri}"
]
index: "product-details-index-cn"
index_type: custom
template_type: "index-template"
template_content: |
{
"template": {
"settings": {
"index.knn": true,
"default_pipeline": "product-cn-nlp-ingest-pipeline"
},
"mappings": {
"properties": {
"ProductID": {
"type": "keyword"
},
"ProductName": {
"type": "text"
},
"Category": {
"type": "text"
},
"Description": {
"type": "text"
},
"Image": {
"type": "text"
},
"combined_field": {
"type": "text"
},
"product_embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "nmslib",
"name": "hnsw",
"space_type": "l2"
}
}
}
}
}
}
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
3. ProductReviews 表(注意替换{}内容)
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:{region}:{account id}:table/ProductReviews"
stream:
start_position: "LATEST"
export:
s3_bucket: "{s3_bucket name}"
s3_region: "{region}"
s3_prefix: "ddb-to-opensearch-export-reviews/"
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
sink:
- opensearch:
hosts:
[
"{opensearch endpoint uri}"
]
index: "product-reviews-index-en"
index_type: custom
template_type: "index-template"
template_content: |
{
"template": {
"settings": {
"index.knn": true,
"default_pipeline": "product-reviews-nlp-ingest-pipeline"
},
"mappings": {
"properties": {
"Comment": {
"type": "text"
},
"ProductName": {
"type": "text"
},
"ProductID": {
"type": "text"
},
"Timestamp": {
"type": "long"
},
"UserID": {
"type": "text"
},
"combined_field": {
"type": "text"
},
"product_reviews_embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "nmslib",
"name": "hnsw",
"space_type": "l2"
}
}
}
}
}
}
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
4. ProductReviewsCN 表(注意替换{}内容)
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:{region}:{account id}:table/ProductReviewsCN"
stream:
start_position: "LATEST"
export:
s3_bucket: "{s3_bucket name}"
s3_region: "{region}"
s3_prefix: "ddb-to-opensearch-export-reviews-cn/"
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
sink:
- opensearch:
hosts:
[
"{opensearch endpoint uri}"
]
index: "product-reviews-index-cn"
index_type: custom
template_type: "index-template"
template_content: |
{
"template": {
"settings": {
"index.knn": true,
"default_pipeline": "product-cn-reviews-nlp-ingest-pipeline"
},
"mappings": {
"properties": {
"Comment": {
"type": "text"
},
"ProductName": {
"type": "text"
},
"ProductID": {
"type": "text"
},
"Timestamp": {
"type": "long"
},
"UserID": {
"type": "text"
},
"combined_field": {
"type": "text"
},
"product_reviews_embedding": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "nmslib",
"name": "hnsw",
"space_type": "l2"
}
}
}
}
}
}
aws:
sts_role_arn: "arn:aws:iam::{account id}:role/dynamodb-etl"
region: "{region}"
验证数据管道
- 进入 Opensearch 的 开发工具中,查询对应 index 数据是否正常写入


- 验证向量化匹配搜索 - 商品信息表 英文
GET /product-details-index-en/_search
{
"size": 10,
"sort": [
{
"_score": {
"order": "desc"
}
}
],
"_source": {
"includes": ["ProductName", "Category", "Description", "ProductID","Image"]
},
"query": {
"neural": {
"product_embedding": {
"query_text": "{任意输入}",
"model_id": "{替换为 model id}",
"k": 13
}
}
}
}
- 验证向量化匹配搜索 - 商品信息表 中文
GET /product-details-index-cn/_search
{
"size": 10,
"sort": [
{
"_score": {
"order": "desc"
}
}
],
"_source": {
"includes": ["ProductName", "Category", "Description", "ProductID","Image"]
},
"query": {
"neural": {
"product_embedding": {
"query_text": "{任意输入}",
"model_id": "{替换为 model id}",
"k": 13
}
}
}
}
部署业务后端
验证 API 功能调用 (预计完成时间 10 分钟)
- 部署 lambda function
进入 lambda 服务控制台,可以看到由 cloudformation 脚本创建的两个 lambda function,进入到 LambdaFunctionBedrock,上传代码 zip文件 https://github.com/SEZ9/ShopAnalytica/blob/main/lambda/bedrock_function.zip,并部署。

- 编辑 LambdaFunctionBedrock 代码第 5 行,填入 opensearch endpoint 地址到 opensearch_host = "",并更新部署。
- 编辑 LambdaFunctionBedrock 代码第 41 行、109 行,填入 opensearch 中创建的 model id,并更新部署。
2. 验证 apigateway
- 进入 apigateway 服务,进入 shopworkshop,对接口进行测试。
2.1 测试获取商品列表接口
- Query strings type=get_products&language='en'
2.2 测试获取商品评论信息接口
- Query strings type=get_product_reviews&language='en'&product_id=''
2.3 测试添加评论接口
- Query strings type=add_product_review&language='en'
- body 消息体
{
'product_id': '',
'product_name': '',
'rate': '',
'comment': '',
'user_id': ''
}
2.4 测试商品推荐接口
- Query strings type=product_recommend&language='en'
- body 消息体
{
input_text: ''
}
5. 测试评论分析接口
- Query strings type=reviews_analytis&language='en'
- body 消息体
{
input_text: ''
}
常见问题
1. Apigateway 请求 lambda 提示无法找到 function?
- 解决办法: 重新编辑 apigateway request settings ,选中对应函数保存即可。

2. 测试接口超时如何处理?
- 解决办法:编辑 lambda 函数的基础配置项中 timeout 设置较长超时时间。

3. Dynamodb ETL 启动一段时间后,opensearch 仍未见数据写入?
- 解决办法:查看管道的 cloudwatch 日志,搜索报错信息,对应处理。
4. 如何使用非 Anthropic 模型 ?Bedrock 以 Llama 2 为例
修改 lambda 函数中 LLM 调用的模型参数
修改结果返回的格式适配
不同模型的返回参数可能略有不同,可以参考 https://docs.thinkwithwp.com/code-library/latest/ug/bedrock-runtime_code_examples.html


部署业务前端
运行前端 UI (预计完成时间 15 分钟)
1. 下载前端代码,front 目录下,https://github.com/SEZ9/ShopAnalytica/tree/main/front 修改 front/src/APP.vue 192 行,替换 url 为 APIgateway URL const APIURL = '' ,检查对应 API 接口请求的 path 为部署的 API Gateway 接口

2. 安装依赖编译运行
- 本地运行 推荐下载新版本 nodejs > v16
npm install
npm run serve
- 编译
npm install
npm run build
3. 验证前端 UI

解决跨域问题
1. API gateway 解决前端跨域问题



总结
本次动手训练营中主要展示内容:
如何使用 Amazon Dynamodb Zero-ETL 近实时的将数据字段进行合并,通过调用 Amazon Bedrock 的 embedding 模型向量化后存储到 Opensearch 进行检索。
如何结合 Amazon Bedrock LLM 模型通过 RAG 架构,先从 Opensearch 进行向量检索后通过 LLM 模型进行商品推荐建议的生成及评论的总结分析。
参考资料:
DynamoDB zero-ETL integration with Amazon OpenSearch
Opensearch connectors for third-party ML platforms
资源清理
为了避免不必要的费用产生,实验完成后执行 cloudformation stack 删除,释放资源。