背景
随着企业数字化转型的加速,越来越多的企业上线了新的财务、供应链和销售管理系统。为了确保全球员工能够熟练掌握并高效使用这些新系统,企业通常会组织全面的线上培训。然而,随着培训规模的扩大和员工数量的增加,企业面临着诸多挑战。特别是在全球范围内,培训经理无法做到 24 小时不间断地为员工提供支持,导致问题解决的速度慢,影响了员工的学习效率。在这种背景下,企业迫切需要一种能够高效支持全球员工系统使用问题的解决方案,帮助他们快速定位并解决问题,同时评估培训效果,确保培训的有效性。
解决思路
为了有效管理和检索培训视频内容,我们设计了一套基于自然语言处理的智能视频检索和自动化考题生成解决方案。该方案 2500 多行代码完全基于 AWS 的服务构建,充分利用了 AWS 在自然语言处理、数据处理和无服务器架构上的综合能力。
功能介绍
通过集成自然语言处理技术,用户可以直接输入自然语言查询,系统会在海量视频库中精准定位到相关视频的相关时间点。该功能通过智能搜索引擎和语义分析,支持视频内容的相关性打分,并在前端提供播放功能。用户不仅能快速找到需要的内容,还能基于相关性评分了解查询结果的匹配程度。
系统自动生成双语字幕,使视频内容更易于理解,适应不同语言背景的用户需求。此外,系统还能提供视频内容的简明总结,帮助用户在短时间内掌握视频的核心信息。这种字幕优化和总结生成能力依赖于先进的自然语言处理和翻译技术,让内容获取更加便捷。
根据视频内容,系统能够自动生成个性化的考卷,供用户进行知识检测。通过动态考题生成,系统可以根据不同的视频内容、知识点和学习进度为用户定制考题,确保评估的准确性和针对性。这一功能使企业可以有效评估员工的学习成果,并基于检测结果优化后续的培训内容。
方案主要特性
- 自动化语音转文字处理:集成 Amazon Transcribe
通过集成 Amazon Transcribe 服务,系统能够自动将培训视频转换为带有时间戳的文字脚本。这样,员工可以通过文本搜索视频内容,并迅速定位到视频中的相关片段,使信息检索更加便捷。
- 数据预处理与优化:使用 Prompt Flow 和 Step Functions
方案通过 AWS Prompt Flow 实现定制的 SRT 字幕重分块处理,以便进一步优化字幕数据。利用 AWS Step Functions 和 Amazon Bedrock,大幅提升字幕的准确性,并提供翻译支持,同时生成视频摘要和考题,支持定制化的学习评估。
- 高效的向量化和数据存储:结合 Lambda、SQS 和 OpenSearch Serverless
优化后的字幕通过 AWS Lambda 和 SQS 进行批量向量化处理,并写入 OpenSearch Serverless,确保了高效的检索能力,同时支持可扩展性和成本效益。
在用户发起查询时,系统利用 Bedrock LLM 对用户输入进行分析与改写,从 OpenSearch 中召回相关数据,并通过 Bedrock LLM 进一步筛选排序,确保返回最相关的内容。
前端基于 Streamlit 构建,提供视频搜索、视频播放和字幕同步功能,为员工提供顺畅的培训资源访问体验。
全面集成 AWS 服务
本解决方案的每个环节都由 AWS 服务提供支持,从视频转录、数据处理到向量化、检索和前端展示,真正实现了全 AWS 方案。这种 all-in-AWS 的架构确保了无缝集成、卓越性能和可扩展性,能够适应不同的工作负载需求。架构中使用了丰富的 AWS 产品,包括 Amazon Transcribe、AWS Lambda、SQS、Amazon Bedrock 和 OpenSearch Serverless,将这些服务统一构建成一个强大且可扩展的框架,不仅提升了性能,还简化了运维,使企业能够专注于内容的交付和学习效果评估,无需担心基础设施问题。
通过部署这套解决方案,企业能够显著提升全球培训的效率和效果,为员工提供快速检索相关内容的能力,并通过动态生成的考题来评估学习成果。
结构介绍
架构图
架构介绍
- SRT Generation – 自动生成字幕
第一步是视频的自动化语音转文字处理。通过 Amazon Transcribe 服务,系统将培训视频转化为带有时间戳的文字脚本,并存储在 S3 存储桶中,以便后续处理。这个过程使视频内容可以通过文本进行搜索,并实现精准的时间定位,便于员工快速查找所需信息。
- SRT Rechunk – 字幕重分块与优化
视频转文字后,进入 SRT 重分块处理 阶段。利用 AWS Prompt Flow,系统对生成的字幕进行重新分块,以确保文本的逻辑和语义完整性。接着,通过 Step Functions 和 Amazon Bedrock 进行进一步优化,包括字幕文本的翻译、分块、语义调整,以提升整体字幕质量,并生成视频的简明总结和考卷。通过这个处理步骤,字幕内容更符合实际语义,使员工更容易理解内容。
- Video Search Data Processing – 视频搜索数据处理
在视频字幕优化后,系统通过 Lambda 和 SQS 对字幕进行向量化批处理,将处理好的数据写入 OpenSearch Serverless,形成一个高效的语义搜索数据库。通过这种方式,系统可以快速进行文本向量化,确保数据的高可用性和高检索速度,为后续的查询提供支撑。
- 信息召回与智能查询 – 搜索和二次筛选
当用户发起查询时,系统会调用 AWS Step Function 以执行多步骤的信息召回流程。首先,系统对用户的查询进行提炼和重写,并通过 OpenSearch 搜索召回相关内容。接着,使用 Bedrock Cohere Embed v3 和 Bedrock Claude 对召回的数据进行二次筛选和相关性分析。通过这一智能筛选流程,系统可以为用户提供与查询高度匹配的内容,并根据相关性排序返回。
- Application Layer – 前端应用层
系统的应用层由 Streamlit 驱动的前端组成。用户在前端可以实现视频搜索、播放和字幕同步的功能。视频播放过程中,字幕会随着视频内容动态显示,支持双语翻译和简明总结。此外,系统会根据用户的学习内容,生成考卷用于知识检测。Streamlit 前端不仅提供了流畅的用户体验,还将所有智能搜索和自动化处理的功能无缝集成。
方案细节
SRT Generation
在 SRT Generation 阶段,系统利用 AWS 的无服务器架构,实现了视频的自动化转录处理,并支持高并发能力,适应多视频文件的并发处理需求。
用户上传视频文件后,视频被存储在 Amazon S3 存储桶(video-material/)中。这不仅提供了可靠的数据存储,还触发了后续的自动化处理流程。
文件上传到 S3 后,会通过 S3 事件通知机制触发 AWS Lambda 函数(launchTranscriptJob)。这是一种事件驱动的无服务器处理方式,确保在上传视频文件后,无需用户干预即可启动转录流程。同时,Lambda 的弹性扩展能力支持同时处理多个文件上传事件,系统可以并发处理多个视频,满足高并发处理的需求。
Lambda 函数会调用 Amazon Transcribe 服务,启动转录任务(Transcribe Job),将视频内容转化为带时间戳的文字脚本(SRT 字幕)。Transcribe 服务会根据音频流的长度自动分配计算资源,确保高效处理并生成精确的文字脚本。
转录任务完成后,生成的 SRT 文件被存储在指定的 S3 位置,随后由下游的事件驱动流程触发后续的字幕优化和语义处理。这种事件驱动的任务流确保了每个视频文件的独立处理,同时利用无服务器架构的弹性扩展能力,不论文件多少,均可快速响应和并发处理。
Serverless 高并发优势
通过 AWS Lambda 和 Amazon Transcribe 的无服务器架构设计,该方案实现了极高的并发能力。每个视频上传即触发独立的 Lambda 实例处理,避免了资源争抢的问题,使得系统能够在短时间内处理大量视频文件,极大地提高了处理效率。
launchTranscriptJob Code
# Author:Li Kaiyuan
# Version: 0.2
import boto3
import time
#lambda entry point
def lambda_handler(event, context):
bucket_name = event['Records'][0]['s3']['bucket']['name']
source_s3_key = event['Records'][0]['s3']['object']['key']
delimiter = source_s3_key.find('/')
file_name = source_s3_key[delimiter+1:]
job_uri = 's3://'+ bucket_name + '/' + source_s3_key
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d-%H-%M-%S", local_time)
job_name = "conv-"+file_name[0:-4]+'-'+formatted_time
transcribe = boto3.client('transcribe')
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
MediaFormat='mp4',
Media={
'MediaFileUri': job_uri
},
OutputBucketName=bucket_name,
OutputKey='output/'+file_name[0:-4]+formatted_time,
Settings={
'ShowSpeakerLabels': True,
'MaxSpeakerLabels': 10,
'ChannelIdentification': False
},
IdentifyMultipleLanguages=True,
Subtitles={
'Formats': [
'srt',
],
'OutputStartIndex': 1
},
Tags=[
{
'Key': 'transcription job',
'Value': job_name
},
]
)
return {
'statusCode': 200,
'body': 'Transcription job started'
}
SRT Rechunk & Optimization
痛点 & Solution
Transcribe 的原生字幕有如下问题:
- 非 JSON 原生格式,不方便进行结构化的字幕处理;
- Transcribe 的语音分段方式比较单一,上下句不清晰;
- 生成的字幕质量和视频本身的语音质量等相关因素强相关,可能会效果不佳;
- 不提供翻译功能;
- 对于单一长视频,用大模型直接处理时,不仅处理时间过长,且受限于模型的 output window,无法输出过长的处理过的视频。
为了解决这些问题,我将字幕的处理拆分成了两个部分:
- SRT RECHUNK:将字幕进行拆分,确保字幕给大模型处理时:
- 单个请求的输出量不大,确保大模型的输出效果
- 减少output window压力,确保经过大模型处理后可以完整被输出
- SRT Optimization:将字幕进行优化,提供:
- 字幕的 rewrite+ 翻译
- 字幕序列的重排
- 根据字幕生成视频的标题和视频内容总结
- 根据字幕生成视频相关问卷
SRT RECHUNK
在 SRT Rechunk 阶段,系统对初始转录生成的字幕(SRT 文件)进行细化处理,以优化文本的逻辑性和可读性。这一阶段通过 AWS 的 Prompt Flow、多步 Lambda 函数和事件驱动机制,实现了高效、自动化的字幕优化流程。
1. 数据准备与事件触发
转录生成的 SRT 文件被存储在 S3 存储桶中(output/)。文件上传后,通过 S3 事件通知触发 receiveSrtAndInvokeChunkingFlow Lambda 函数,启动了字幕的重分块流程。这种事件驱动的设计确保每个新生成的 SRT 文件都能够自动进入处理流程,无需手动操作。
2. Prompt Flow 流程 – 精细化分块
在重分块阶段,系统利用 Prompt Flow 实现多步骤的智能分块:
- Lambda function structuringSrt:首先对 SRT 文本结构进行解析和预处理,将 SRT 格式转化为 JSON 格式,确保文本的基本格式和语义结构得到合理整理。
- Lambda function extractCandidateChunk:对处理后的文本提取出候选的分块点初选,以确保每个分片的大小合适。
- Prompts ElectChunkPoint:利用 AI 模型分析 SRT 文本内容,二次选取适合的分块点,以保证分块后的文本片段符合自然的语义逻辑,且分片点不会造成语句的中断,比如将I’m a man中的I’m 和a man分在两个分片中。
- Lambda function SrtChunking:根据前一步生成的分块点,将 SRT 文本分割成多个片段,形成结构清晰的分块文本。
3. 事件驱动的输出与后续优化处理
分块后的 SRT 文本会触发 ChunkedSRTEvent 事件,进入到下一个 Lambda 函数(srtOptimization)进行进一步优化和翻译处理。这样的事件链式触发流程,不仅实现了无缝衔接,也提升了系统的自动化和高效性。
Key Prompt
<persona>
1. You are an expert subtitle editor, skilled in subtitle proofreading.
2. The input <srt> is an array containing SRT fragments, which may include spelling errors and are divided into overly short sections.
3. Your task is to identify the sentence most likely to be the beginning of a statement in each group.
4. The output must only be an array matching the specified <output format> and should not include any other content.
</persona>
<srt>
{{srt}}
</srt>
<task>
1. The <srt> input is an array, with each item being a group of SRT fragments.
- Each group has the structure:
{
"srt_num_1": {"Srt": "Value", "Timestamp": "Value" },
"srt_num_2": {"Srt": "Value", "Timestamp": "Value" },
...
}
2. The SRT fragments within each group represent chopped segments from a continuous context.
3. For each group, select the fragment most likely to be the beginning of a sentence among all fragments.
4. Output the selected fragments in an array format, ensuring only the selected items appear in the output.
5. Confirm the output strictly adheres to the array format.
</task>
<output format>
[
{
"srt_num_A": { "Srt": "Value", "Timestamp": "Value" },
"srt_num_B": { "Srt": "Value", "Timestamp": "Value" }
}
]
</output format>
<example>
If the input is:
[
{
"67": { "Srt": "That's a little bit faster than G BT 40, and way faster than cloud", "Timestamp": "00:03:15,020 --> 00:03:20,199" },
"68": { "Srt": "opus. All right.", "Timestamp": "00:03:20,490 --> 00:03:21,429" },
"69": { "Srt": "So now it's time for the showdown side by side tests between cloud 3.5 Sonnet", "Timestamp": "00:03:21,440 --> 00:03:26,699" },
"70": { "Srt": "and GP T 40,", "Timestamp": "00:03:26,880 --> 00:03:28,270" },
"71": { "Srt": "I'll present each model with the same prompt", "Timestamp": "00:03:28,460 --> 00:03:31,100" },
"72": { "Srt": "and evaluate their responses for each test.", "Timestamp": "00:03:31,440 --> 00:03:34,210" },
"73": { "Srt": "I'll choose a winner based on my somewhat subjective criteria", "Timestamp": "00:03:34,220 --> 00:03:38,500" }
}
]
Thinking steps:
1. There is only one group in the input array.
2. Within this group, the fragment "That's a little bit faster than G BT 40, and way faster than cloud" is most likely the beginning of a sentence.
3. The SRT number of this fragment is 67.
The output should be:
[
{
"67": { "Srt": "That's a little bit faster than G BT 40, and way faster than cloud", "Timestamp": "00:03:15,020 --> 00:03:20,199" }
}
]
</example>
SRT OPTIMIZATION
在 Video Search Data Processing 阶段,系统通过 AWS Step Functions 对字幕文本进行进一步优化和处理,生成更高质量的字幕、视频摘要和考卷,同时完成向量化和存储的准备,以支持后续的高效检索和语义搜索。
1. 字幕优化与预处理 – Step Functions 工作流
优化处理从 Step Functions 工作流开始,该工作流会对已经分块的 SRT 字幕(Chunked SRT)进行多步骤的优化处理:
- 首先获取 Chunked SRT 字幕文件。
- 并行处理各个字幕分块,确保处理速度和效率:
-
- 对字幕进行语句的重组(例如把原生断句的字幕组合成一句)
- 对字幕进行重写,优化字幕质量并提供翻译
- 并行处理后,对结果进行合并与序列重排
- 根据优化后的字幕文件,生成视频总结,视频问卷。优化后的字幕、视频总结和考卷分别被存储到 S3 存储桶中的不同文件夹(TestPaper/、Summary/ 和 Srt/),确保内容有序存放,便于后续的检索和管理。
2. 字幕向量化与存储
优化后的 SRT 文件通过通过 SQS 触发 Lambda 函数进行向量化并行批处理。这一过程利用 Amazon Bedrock Cohere Embed v3 模型,将文本转化为嵌入向量,方便后续的语义搜索。
3. 向量数据存储与检索准备
向量化后的数据进入 OpenSearch Serverless,建立一个高效的语义搜索数据库。这一数据库支持用户基于自然语言查询快速找到相关视频内容,并返回相关性打分。这种基于嵌入向量和语义搜索的技术,使系统能够更好地理解用户查询意图,从而提升检索的精准度和用户体验。
Key Prompt
<persona>
You are an expert subtitle editor, highly skilled at proofreading and correcting subtitles. Using your knowledge and context, ensure the final output is an array, containing only the corrected subtitles and no additional content.
</persona>
<srt>
{srt}
</srt>
<srt_instructions>
1. The <srt> provided is a JSON format subtitle file with subtitle numbers, timestamps, and content.
2. Due to limitations in subtitle generation tools, there are many word errors in the subtitle content.
3. The time segments for splitting subtitles are too short, often dividing a single sentence into multiple fragments.
4. Example of the <srt> format:
{
"129": {
"Timestamp": "00:06:22,850 --> 00:06:24,720",
"Srt": "So let's see if these two models can do it."
},
"130": {
"Timestamp": "00:06:24,929 --> 00:06:25,640",
"Srt": "Um"
},
"131": {
"Timestamp": "00:06:25,799 --> 00:06:27,790",
"Srt": "I was thinking the prompt of"
},
"132": {
"Timestamp": "00:06:28,640 --> 00:06:31,500",
"Srt": "create a dialogue between a dragon and a knight."
}
}
</srt_instructions>
<task>
Your tasks:
1. Merge and re-segment subtitles so that each segment contains **one complete sentence**. Merge segments with adjacent subtitles if they form a single sentence.
2. Correct any errors within these sentences using your knowledge of the context.
3. Translate each sentence into Chinese.
4. If segments are merged, combine the corresponding timings and renumber the segments sequentially, starting from the first subtitle number in <srt>.
5. Ensure the output is an array and contains only the formatted subtitles. Do not include any additional content.
</task>
<example>
Example output format:
[
{
"xxxxx": {
"Timestamp": "00:06:25,799 --> 00:06:31,500",
"Srt": "I was thinking about creating a dialogue between a dragon and a knight.",
"TranslatedSrt": "我在考虑创作一个龙与骑士之间的对话。"
}
}
]
</example>
信息召回
在信息召回阶段,系统利用 AWS Step Functions 配合 Amazon Bedrock 服务,通过语义搜索和智能筛选,精准查找到符合用户查询需求的内容,并返回最相关的信息。
1. 关键词提取
流程从提取用户查询中的关键词开始。通过 Bedrock LLM 对用户的请求进行重写,这一步确保了查询的核心内容被捕捉,使系统能够基于关键词准确定位相关信息。
2. 嵌入查询
系统将用户输入的查询转化为嵌入向量(Embedding),这是通过 Bedrock Cohere Embbed v3 模型完成的。嵌入后的查询与向量化的字幕数据进行匹配,使系统能够理解用户查询的语义并在数据库中找到相似度最高的内容。
3. 筛选最适配的回复
检索到初步相关内容后,找寻召回文本内容的字幕上下文,系统利用 Bedrock Claude 模型对上下文进一步筛选。这一步通过大语言模型(LLM)对匹配结果进行分析,确保最终返回的内容与用户意图最为接近。Claude 模型根据上下文和语义关系对候选结果进行排序,从而优选出最相关的回复。
4. 提取返回前三个结果
最终,从筛选出的结果中提取出相关性最高的前三项并返回给用户。这种多层筛选确保了用户获得的内容具有高度的精准性和相关性,提供了良好的用户体验。
Key Code
# Author:Li Kaiyuan
# Version: 0.8
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import os
import boto3
import json
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
srt_index_name = os.environ["QUERY_INDEX_NAME"]
host = os.environ["OPENSEARCH_HOST"]
region = os.environ["AWS_REGION"]
def generate_text_embeddings(event):
try:
content = json.loads(event["Body"]["content"][0]["text"])
query = content["key_words"]
body = json.dumps({
"texts": [query],
"input_type": "search_query",
"embedding_types": ["float"]
})
bedrock = boto3.client(service_name='bedrock-runtime')
response = bedrock.invoke_model(
body=body,
modelId="cohere.embed-english-v3",
accept='*/*',
contentType='application/json'
)
response_body = json.loads(response.get('body').read())
query_embedding = response_body["embeddings"]["float"][0]
return {
"question": query,
"query_embedding": query_embedding
}
except ClientError as e:
logger.error(f"Error invoking the model: {e}")
return {"error": str(e)}
def query_opensearch(query_embedding, host=host, index_name=srt_index_name):
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region, "aoss")
# Initialize OpenSearch client
search = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Main query to retrieve relevant docs
main_query = {
"_source": ["Number", "Srt", "Timestamp", "TranslatedSrt", "VideoName"],
"size": 10,
"query": {
"knn": {
"embeddings": {
"vector": query_embedding,
"k": 100
}
}
}
}
response = search.search(body=main_query, index=index_name)
retrieves = response["hits"]["hits"]
related_srts = []
for retrieve in retrieves:
retrieve_srt = retrieve["_source"]
Number = retrieve_srt["Number"]
VideoName = retrieve_srt["VideoName"]
# Adjust "Numbers" to a range of IDs
Numbers = [Number + i for i in range(6)]
# Query to retrieve related SRTs
related_query = {
"_source": ["Srt"],
"query": {
"bool": {
"filter": [
{"terms": {"Number": Numbers}},
{"term": {"VideoName": VideoName}}
]
}
}
}
related_response = search.search(body=related_query, index=index_name)
doc = [srt["_source"].get("Srt", "No Srt found") for srt in related_response.get("hits", {}).get("hits", [])]
retrieve_srt["doc"] = doc # Attach related SRTs
related_srts.append(retrieve_srt)
return related_srts
def lambda_handler(event, context):
query_info = generate_text_embeddings(event)
related_srts = query_opensearch(query_info["query_embedding"])
rerank_info_str = {
"rerank_info": json.dumps({
"Question": query_info["question"],
"Related_srts": related_srts
})
}
return rerank_info_str
Application Layer
在更新后的 Application Layer 应用层中,架构通过 Lambda 和 Step Functions 的组合,为用户请求提供了更高效的处理流程,进一步优化了系统的响应速度和流程自动化。
1. 前端界面 – Streamlit
Streamlit 作为应用层的前端接口,为用户提供了一个直观且互动的界面。用户可以通过 Streamlit 前端输入查询内容,发起对视频或字幕的搜索请求。Streamlit 作为用户界面,直接展示了查询结果,包括视频播放、字幕内容和相关性评分等,方便用户快速访问所需信息。
Streamlit 会把字幕加载到内存中和视频一同播放。
2. API 调用和请求处理 – Lambda 与 Step Functions
用户的请求通过 Streamlit 发出,首先到达 Lambda 函数(apiCall StepFunction)。该 Lambda 函数负责初始化请求并调用 Step Functions 进行更复杂的后端处理流程。通过这种设计,可以灵活地处理多步骤的请求操作,并根据不同的查询需求启动对应的处理逻辑。平均请求延时为 15 秒,确保了复杂查询的响应速度。
Key Code
# Author:Li Kaiyuan
# Version: 0.14
import streamlit as st
import time
import json
import boto3
import logging
import requests
from typing import List, Dict, Optional
import base64
# 配置类
class Config:
LOGGING_LEVEL = logging.INFO
PAGE_TITLE = "VideoSearch Engine"
PAGE_ICON = ":robot:"
LOGO_PATH = "images/logo.png"
LAMBDA_FUNCTION_NAME = "apiGatewayEntry" # 替换为你的 Lambda 函数名称
# UI组件类
class UIComponents:
@staticmethod
def get_user_input() -> str:
user_input = st.text_input("您的问题:", key="input")
if user_input and len(user_input.strip()) < 3:
st.warning("请输入至少3个字符")
return ""
return user_input.strip()
@staticmethod
def setup_page():
st.set_page_config(
page_title=Config.PAGE_TITLE,
page_icon=Config.PAGE_ICON,
layout="wide"
)
@staticmethod
def show_header():
col_logo, col_title = st.columns([1, 7])
with col_logo:
st.image(Config.LOGO_PATH, width=80)
with col_title:
st.title(Config.PAGE_TITLE)
# 添加缓存装饰器
@st.cache_data(ttl=3600)
def cached_lambda_invoke(user_input: str):
return APIHandler.invoke_lambda(user_input)
# 问卷显示类
class QuestionnaireDisplay:
@staticmethod
def reset_quiz(video_id: str, questions: List[Dict]):
if st.button(f"重置测验", key=f"reset_{video_id}"):
st.session_state[f'score_{video_id}'] = 0
st.session_state[f'answered_{video_id}'] = [False] * len(questions)
st.experimental_rerun()
@staticmethod
def display(questions: List[Dict], video_id: str):
if not questions:
st.write("问卷内容无法加载")
return
QuestionnaireDisplay.reset_quiz(video_id, questions)
# 初始化会话状态 - 使用video_id作为key的一部分
score_key = f'score_{video_id}'
answered_key = f'answered_{video_id}'
if score_key not in st.session_state:
st.session_state[score_key] = 0
if answered_key not in st.session_state or len(st.session_state[answered_key]) != len(questions):
st.session_state[answered_key] = [False] * len(questions)
for i, question in enumerate(questions):
QuestionnaireDisplay._display_question(i, question, video_id)
QuestionnaireDisplay._show_score(len(questions), video_id)
@staticmethod
def _display_question(index: int, question: Dict, video_id: str):
st.markdown(f"##### Question {index + 1}: {question['question']}")
for option in question['options']:
# 使用video_id创建唯一的key
key = f"q{index}_o{option['index']}_{video_id}"
checked = st.checkbox(
option['content'],
key=key,
disabled=st.session_state[f'answered_{video_id}'][index]
)
if checked and not st.session_state[f'answered_{video_id}'][index]:
QuestionnaireDisplay._handle_answer(index, option, question, video_id)
st.write("---")
@staticmethod
def _handle_answer(index: int, option: Dict, question: Dict, video_id: str):
st.session_state[f'answered_{video_id}'][index] = True
if option['index'] == question['answer']:
st.success("Correct!")
st.session_state[f'score_{video_id}'] += 1
else:
correct_answer = question['options'][question['answer']]['content']
st.error(f"Wrong. Correct answer: {correct_answer}")
@staticmethod
def _show_score(total_questions: int, video_id: str):
answered_questions = sum(st.session_state[f'answered_{video_id}'])
score = st.session_state[f'score_{video_id}']
if answered_questions == total_questions:
st.success(f"Quiz completed! Your score: {score}/{total_questions}")
else:
st.info(f"Questions answered: {answered_questions}/{total_questions}")
st.info(f"Current score: {score}/{total_questions}")
# 视频显示类
class VideoDisplay:
@staticmethod
def display(item: Dict):
"""Display video and questionnaire with VTT subtitles in memory to avoid CORS issues."""
try:
video_id = f"{item['title']}_{item['start_time']}".replace(':', '_')
# Display video title
if item.get('title'):
st.markdown(f"### Video Name: {item['title']}")
# Download VTT content and encode it as base64
vtt_content = VideoDisplay._download_vtt_content(item.get('vtt_url'))
if vtt_content:
vtt_data_uri = f"data:text/vtt;base64,{vtt_content}"
# Calculate start time in seconds
start_time = VideoDisplay._timestamp_to_seconds(item.get('start_time', '00:00:00'))
# Construct HTML with embedded subtitles
video_html = f"""
<video id="myVideo_{video_id}" width="800" height="450" controls>
<source src="{item['video_url']}" type="video/mp4">
<track src="{vtt_data_uri}" kind="subtitles" srclang="en" label="English" default>
Your browser does not support the video tag.
</video>
<script>
var video = document.getElementById("myVideo_{video_id}");
video.addEventListener('loadedmetadata', function() {{
video.currentTime = {start_time};
}}, false);
</script>
"""
st.components.v1.html(video_html, height=500)
else:
st.error("Failed to load subtitles due to CORS or other issues.")
if item.get('description'):
st.markdown("#### Related SRT")
st.caption(item['description'])
if item.get('relevance'):
st.caption("Relevance: " + str(item['relevance']))
if item.get('summary'):
st.markdown("#### Summary")
st.caption(item['summary'])
st.write("---")
with st.expander("查看相关的问卷"):
st.markdown("### 问卷内容")
QuestionnaireDisplay.display(item['checklist'], video_id)
except Exception as e:
st.error(f"Error displaying content: {e}")
@staticmethod
def _download_vtt_content(vtt_url: str) -> Optional[str]:
"""Download VTT content and return it as a base64-encoded string."""
try:
response = requests.get(vtt_url)
response.raise_for_status()
vtt_content = base64.b64encode(response.content).decode('utf-8')
return vtt_content
except requests.RequestException as e:
st.error(f"Error downloading VTT content: {e}")
return None
@staticmethod
def _timestamp_to_seconds(timestamp: str) -> int:
"""将时间戳转换为秒数"""
try:
hours, minutes, seconds = map(int, timestamp.split(':'))
return hours * 3600 + minutes * 60 + seconds
except:
return 0
# API处理类
class APIHandler:
@staticmethod
def invoke_lambda(user_input: str) -> Optional[Dict]:
session = boto3.Session(profile_name="skydev")
lambda_client = session.client(
service_name = "lambda",
region_name = "us-west-2")
payload = {"input": user_input} # 传递用户输入作为 payload
try:
# 调用 Lambda 函数
response = lambda_client.invoke(
FunctionName=Config.LAMBDA_FUNCTION_NAME,
InvocationType="RequestResponse", # 同步调用
Payload=json.dumps(payload)
)
# 读取并解析 Lambda 返回的内容
response_payload = response['Payload'].read()
result = json.loads(response_payload)
# 检查响应状态码和内容
if "statusCode" in result and result["statusCode"] == 200:
body_content = result["body"]
if isinstance(body_content, str): # 处理嵌套的 JSON
body_content = json.loads(body_content)
return body_content # 返回解析后的 JSON 数据
else:
st.error("Lambda 返回了错误信息")
return None
except Exception as e:
st.error(f"调用 Lambda 时出现异常: {str(e)}")
return None
# 主应用逻辑
def main():
# 设置日志
logging.basicConfig(level=Config.LOGGING_LEVEL)
logger = logging.getLogger(__name__)
try:
UIComponents.setup_page()
UIComponents.show_header()
user_input = UIComponents.get_user_input()
if user_input:
# 开始计时
start_time = time.time()
with st.spinner('正在搜索相关内容...'):
response = cached_lambda_invoke(user_input)
# 结束计时
end_time = time.time()
elapsed_time = end_time - start_time
# 显示响应时间
st.write(f"搜索耗时: {elapsed_time:.2f} 秒")
if response and 'res' in response:
if len(response['res']) == 0:
st.info("没有找到相关内容")
else:
for item in response['res']:
VideoDisplay.display(item)
st.write("---")
else:
st.error("搜索出现错误,请稍后重试")
except Exception as e:
logger.error(f"Application error: {str(e)}")
st.error("应用程序出现错误,请刷新页面重试")
if __name__ == "__main__":
main()
总结
在企业数字化转型的浪潮中,全球化线上培训的需求与日俱增,如何在全球范围内高效地支持员工掌握新系统,提升学习和工作效率,成为企业面临的重要挑战。针对这些需求,我们设计并实现了一套基于 AWS 全栈服务的智能视频检索和自动化考题生成解决方案,充分利用了 AWS 在自然语言处理、数据处理和无服务器架构上的综合能力。
该方案的核心优势体现在以下几个方面:
- 智能视频检索和内容定位:基于自然语言处理技术,用户可以通过输入自然语言查询,系统能在庞大的视频库中精准定位到相关内容,并根据相关性评分帮助用户判断内容的匹配度。同时,通过 Streamlit 前端实现了流畅的视频播放和字幕同步,提升了用户体验。
- 双语字幕和视频内容总结:系统能够自动生成双语字幕和视频内容总结,使内容更易于理解和获取。通过 AWS Prompt Flow 和 Step Functions 的多步骤处理,字幕被精细化分块并优化质量,使其更加符合语义逻辑,适应多语言背景的需求。
- 智能考卷生成与知识检测:系统可以根据视频内容动态生成个性化的考题,用于检测员工的知识掌握程度。这一功能帮助企业量化培训效果,并根据检测结果优化培训内容和方式,确保培训的有效性。
- 高效的无服务器架构:通过 Amazon Transcribe 实现自动化语音转文字,并结合 Lambda、SQS 和 OpenSearch Serverless 实现字幕的向量化存储和语义搜索,确保系统在高并发的情况下也能保持快速响应。
- 数据预处理和优化:利用 AWS 的多种服务,系统自动完成字幕的重分块、优化和翻译,进一步提高了字幕的精确性和可读性,保证员工能更清晰地理解视频内容。
- 全 AWS 集成架构:从数据采集、预处理到查询与展示,整个方案完全基于 AWS 服务,确保了无缝集成、卓越性能和可扩展性,减少了系统运维的复杂性,让企业能够专注于内容交付和效果评估。
这套基于 AWS 的解决方案,帮助企业构建了一个智能化、自动化、全球化的培训支持平台。它不仅提升了员工的学习效率和内容检索能力,同时通过自动生成的考卷实现了对培训效果的量化评估,为企业的培训体系提供了重要的支持。通过这个方案,企业能够以更少的时间和人力投入,显著提升全球培训的效率和效果。
*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。
本篇作者