前言
关于 Discord
Discord 在游戏行业扮演着重要角色,作为一个专为游戏玩家设计的社交平台。它提供了实时语音、文字和视频通信功能,让玩家可以轻松组队、讨论策略和分享游戏体验。游戏开发商和发行商也利用 Discord 建立官方社区,与玩家互动并收集反馈。随着时间推移,Discord 不仅服务于游戏玩家,还扩展到教育和其他领域,成为一个多功能的社交平台。它的成功促使许多游戏厂商重视社区运营,将其视为长线运营的必备能力。通过在 Discord 上进行舆情分析,游戏公司可以及时捕捉玩家的声音,快速应对潜在危机,并利用数据驱动决策,从而提升游戏的整体运营效果和玩家满意度。
本文会带大家一起逐步搭建一套自动化分析 Discord 上玩家舆情的解决方案,通过该方案您可以制定执行周期,通过 discord.py 爬取玩家聊天分析其语义,并作出舆情判断。
架构说明
本架构中我们采用完全 serverless 的架构进行搭建,架构中几个重要的服务如下所示:
AWS Glue:亚马逊云科技(AWS)提供的一种完全托管的提取、转换和加载(ETL)服务,旨在简化数据集成任务。它可以自动推理数据格式,并根据源数据和目标数据存储构建 ETL 代码。
Amazon Bedrock:一项完全托管的服务,可提供多种高性能基础模型(FM),以及构建生成式人工智能应用程序所需的一系列广泛功能,通过符合安全性和隐私性的负责任人工智能简化开发成本。
Amazon Athena:是一种交互式查询服务,让您能够轻松使用标准 SQL 直接分析 Amazon Simple Storage Service(Amazon S3)中的数据。只需在 AWS Management Console 中执行几项操作,即可将 Athena 指向 Amazon S3 中存储的数据,并开始使用标准 SQL 运行临时查询,然后在几秒钟内获得结果。
LangChain Summarize
LangChain 提供了多种文本总结的方法,其中 refiner 是一种较为先进的迭代式总结技术。以下是对 LangChain 总结方法的介绍:
- 基础总结:使用简单的提示模板和语言模型生成摘要。
- Map-reduce:将长文本分割成小块,分别总结后再合并。适用于较长文档。
- Stuff:将所有文本直接输入模型,适合短文本。
- Refiner:这是一种迭代式的总结方法,特别适合处理长文档。其工作流程如下:
- 首先对文档的初始部分生成一个摘要。
- 然后将这个初始摘要与下一部分文本一起输入模型,生成更新的摘要。
- 重复这个过程,不断”提炼”摘要,直到处理完整个文档。
Refiner 方法的优势在于:
- 可以处理非常长的文档,克服了模型输入长度限制。
- 通过迭代提炼,保证了摘要的连贯性和全面性。
- 能够捕捉文档中的关键信息,并在最终摘要中保留。
使用 refiner 方法时,可以通过调整每次迭代的文本长度、迭代次数等参数来优化性能,本文即采用 Refiner Chain 来实现 Discord 信息的总结功能。
Discord.py
Discord.py 是一个强大的 Python 库,专为开发 Discord 机器人而设计。它提供了与 Discord API 交互的简便方法,允许开发者创建自定义命令、事件监听器和自动化任务。
Discrod Bot 配置
创建Application,本文参考 https://discordpy.readthedocs.io/en/stable/discord.html
登陆至 https://discord.com/developers/applications,创建Application
添加 Bot 名称
创建完毕后,点击 Bot→Reset Token
配置 OAuth2 权限后加入到 Discord Channel
拷贝至浏览器,邀请 Bot 至相关频道就行了。
方案部署
整套解决方案使用 AWS CDK 进行部署,所以需要在本地具备以下环境。
安装依赖
1. 安装 nodejs18
sudo yum install https://rpm.nodesource.com/pub_18.x/nodistro/repo/nodesource-release-nodistro-1.noarch.rpm -y
sudo yum install nodejs -y --setopt=nodesource-nodejs.module_hotfixes=1 --nogpgcheck
2. 安装 & 启动 docker
sudo yum install docker -y
sudo service docker start
sudo chmod 666 /var/run/docker.sock
3. 安装 git 和 jq
sudo yum install git -y
sudo yum install jq
4. 安装 aws-cdk
sudo npm install -g aws-cdk
sudo npm install --global yarn
5. 安装 11(Amazon Linux2023 自带 Python3.11 环境),其他系统可以参考以下安装方式
wget https://www.python.org/ftp/python/3.11.8/Python-3.11.8.tgz
tar xzf Python-3.11.8.tgz
cd Python-3.11.8
sudo ./configure --enable-optimizations
6. 安装依赖后,依赖信息如下
CDK 2.122.0+
nodejs 18+
npm 10+
python 3.11+
git
开始部署
首先我们需要将示例代码下载到本地
git clone https://github.com/aws-samples/discord-community-insights-with-GenAI.git
cd llm-text-keyword-summary/deploy
执行生成环境变量脚本,生成相应环境变量,脚本内容如下
#!/bin/bash
account_id=`aws sts get-caller-identity --query "Account" --output text`
ts=`date +%y-%m-%d-%H-%M-%S`
unique_tag="$account_id-$ts"
# Glue 作业名称
GLUE_JOB_NAME="llm-analysis-text-job"
# Glue Discord 作业名称
GLUE_DISCORD_JOB_NAME="discord-message-collect-job"
# Glue Discord 作业名称
GLUE_SUMMARIZE_JOB_NAME="llm-summarize-job"
# 存储discord token secretname
DISCORD_SECRET_NAME="discord-token"
# DynamoDB中存储prompt的表名
LLM_ANALYSIS_TEXT_TABLE_NAME="prompt-template"
# 存储原始数据以及分析结果的S3名称
S3_BUCKET_NAME="llm-analysis-text-${unique_tag}"
# S3中原始数据Prefix的名称
RAW_DATA_PREFIX="raw-data/"
# Glue Catalog中DB名称
GLUE_DATABASE="llm_text_db"
# Glue Catalog中查询S3中分析数据使用表名
GLUE_TABLE="sentiment_result"
# Glue Catalog中查询S3中总结数据使用表名
GLUE_SUMMARY_TABLE="summary_result"
echo "GLUE_JOB_NAME=${GLUE_JOB_NAME}" > .env
echo "GLUE_DISCORD_JOB_NAME=${GLUE_DISCORD_JOB_NAME}" >> .env
echo "GLUE_SUMMARIZE_JOB_NAME=${GLUE_SUMMARIZE_JOB_NAME}" >> .env
echo "LLM_ANALYSIS_TEXT_TABLE_NAME=${LLM_ANALYSIS_TEXT_TABLE_NAME}" >> .env
echo "S3_BUCKET_NAME=${S3_BUCKET_NAME}" >> .env
echo "RAW_DATA_PREFIX=${RAW_DATA_PREFIX}" >> .env
echo "GLUE_DATABASE=${GLUE_DATABASE}" >> .env
echo "GLUE_TABLE=${GLUE_TABLE}" >> .env
echo "GLUE_SUMMARY_TABLE=${GLUE_SUMMARY_TABLE}" >> .env
echo "DISCORD_SECRET_NAME=${DISCORD_SECRET_NAME}" >> .env
执行脚本
使用 CDK 安装环境
npm install
cdk bootstrap
cdk synth
cdk deploy
等待几分钟后,方案部署完毕,会有以下输出内容,当然我们也可以在 AWS console 中 CloudFormation 中查看
创建 Glue Catalog DataBase
创建一个自定义名称的 DB 后
创建 Athena Table,打开 Athena Console Query Editor 执行下面的两条语句,本案例中我们会在 Glue Job 中输出玩家在 Discord 中对于游戏评价关键字的 positive neural negative 的类型并且需要对玩家评价进行汇总报告,同时我们也会针对每次运行的 Glue Job 结果进行分片处理,创建 Sentiment Result Table 内容如下所示。
sentiment_result 表:针对原始的聊天记录分析后存储在这个表
CREATE EXTERNAL TABLE sentiment_result (
chat STRING,
sentiment STRING
)
PARTITIONED BY (
job_id STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')
LOCATION "s3://<替换上面配置的S3 Bucket名称>/result/"
TBLPROPERTIES (
"projection.enabled" = "true",
"projection.job_id.type" = "injected",
"storage.location.template" = "s3://<替换上面配置的S3 Bucket名称>/result/job_id=${job_id}"
)
summary_result 表:存储最终总结的结果
CREATE EXTERNAL TABLE summary_result (
counts STRING,
summary STRING
)
PARTITIONED BY (
job_id STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')
LOCATION "s3://<替换上面配置的S3 Bucket名称>/summary/"
TBLPROPERTIES (
"projection.enabled" = "true",
"projection.job_id.type" = "injected",
"storage.location.template" = "s3://<替换上面配置的S3 Bucket名称>/summary/job_id=${job_id}"
)
user_jobs 表:
CREATE EXTERNAL TABLE user_jobs (
channel_id STRING,
channel_name STRING,
message_count INT,
job_run_id STRING,
run_time STRING,
timestamp BIGINT
)
PARTITIONED BY (
username STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')
LOCATION "s3://<替换上面配置的S3 Bucket名称>/user-jobs/"
TBLPROPERTIES (
"projection.enabled" = "true",
"projection.username.type" = "injected",
"storage.location.template" = "s3://<替换上面配置的S3 Bucket名称>/user-jobs/username=${username}"
)
如下图所示
关键代码解析
信息采集阶段
Discord Token 信息为了安全我们放置在 Secret Manager 中,在使用时取出。
def get_discord_token():
secret_name = "discord-token"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager'
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
raise e
return json.loads(get_secret_value_response['SecretString'])
async def get_recent_messages(channel):
print("Calculating one_week_ago...")
data_period = int(token_info.get('DATA_PERIOD'))
one_week_ago = datetime.now() - timedelta(days=data_period)
print(f"one_week_ago: {one_week_ago}")
messages = [message async for message in channel.history(after=one_week_ago)]
message_data = [to_json(message) for message in messages]
print(f"Messages count: {len(message_data)}")
return message_data
语义分析阶段
prompt_sentiment = '''
You are a chat message sentiment classifer
Here is a document you will classify the senetiment
<doc>
{content}
</doc>
please list all the content then classify the sentiment of each content into [positive,neutral,negative]'
Please follow below requirements:
1. You will strictly be based on the document in <doc>.
2. please enclose your analysis results in xml tag <sentiment>.
for example:
<sentiment>
1. "auction is very good." [positive]
2. "I got a good deal at the auction." [positive]
3. " The auction house was terrible." [negative]
4. "auction sucks" [negative]
5. "There were people giving out red envelopes (with money) at the auction house." [neutral]
</sentiment>
Skip the preamble, go straight into the answer.
'''
print('prompt_sentiment:',prompt_sentiment)
class CustOuputParser(BaseOutputParser[str]):
def extract(self, content: str) -> tuple[str, str]:
pattern = r'"(.*?)" \[(.*?)\]'
matches = re.findall(pattern, content)
if matches:
return [(text, sentiment) for text, sentiment in matches]
def parse(self, text: str) -> str:
results = self.extract(text)
output = []
for text, sentiment in results:
output.append(json.dumps({"chat": text, "sentiment": sentiment}, ensure_ascii=False))
return "\n".join(output)
@property
def _type(self) -> str:
return "cust_output_parser"
prompt_sentiment = ChatPromptTemplate.from_template(prompt_sentiment)
llm_sonnet = ChatBedrock(model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_kwargs={"temperature": 0,
"top_k":10,
"max_tokens": 1024,
"top_p":0.5,
# "stop_sequences":['</response>']
})
output_parser = CustOuputParser()
sentiment_chain = prompt_sentiment | llm_sonnet | output_parser
文本总结阶段
总结阶段,我们使用 aws wrangler 来获取上一步已经做过语义分析的结果进行总结。
df = wr.athena.read_sql_query(f'SELECT * FROM sentiment_result where job_id=\'{job_run_id}\'', database=glue_db)
print(df)
sentiment_counts = df.groupby('sentiment').size()
print(sentiment_counts)
sentiment_counts_json = sentiment_counts.to_json(orient='index')
print(sentiment_counts_json)
chat_content = df['chat']
# 使用str.cat()方法将每一行内容添加回车符号并连接在一起
combined_chat = chat_content.str.cat(sep='\n')
print(combined_chat)
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=100
)
split_docs = text_splitter.split_text(combined_chat)
docs = [Document(page_content=doc) for doc in split_docs]
llm_summarize = ChatBedrock(model_id="anthropic.claude-3-sonnet-20240229-v1:0",
model_kwargs={"temperature": 0,
"top_k":10,
"max_tokens": 1024,
"top_p":0.5
})
Prompt 说明
prompt_template = """You are the summarizer of chat records for game players. You need to summarize the chat records within <content> in Chinese, focusing primarily on opinions regarding in-game mechanisms and game items.
You need to provide a summary based on three dimensions: positive feedback, negative feedback, and player suggestions. Additionally, you should provide typical player chat for each dimension.
<content>
{text}
</content>
CONCISE SUMMARY:"""
prompt = PromptTemplate.from_template(prompt_template)
refine_template = (
"You are the summarizer of chat records for game players, your job is to produce a final summary in Chinese\n"
"We have provided an existing summary up to a certain point: {existing_answer}\n"
"We have the opportunity to refine the existing summary"
"(only if needed) with some more context below.\n"
"------------\n"
"{text}\n"
"------------\n"
"Given the new context, refine the original summary"
"If the context isn't useful, return the original summary."
)
端到端流程演示
通过一下步骤,启动 Streamlit Demo服务
安装依赖
cd llm-text-keyword-summary/demo
pip3 install -r ./requirements.txt
配置环境变量
打开 gen_demo_env.sh
#!/bin/bash
# Api Gateway 暴露的domain URL,需要在CDK 部署完毕后进行配置
domain_url="https://xxxx.execute-api.us-east-1.amazonaws.com/prod/"
# 调用Api Gateway 暴露的domain URL API key,需要在CDK 部署完毕后进行配置
apikeys=""
echo "domain_url=${domain_url}" >> .env
echo "apikeys=${apikeys}" >> .env
Domain URL 在上面执行完 CDK 后会输出,我们同时查看 API Key 的值,打开 Console
拿到 Domain URL 以及 API key 之后,执行脚本
创建用于登录的 Demo 的用户和密码
在命令行中使用 vi 命令打开 config.yaml 进行编辑(使用其他工具编辑也可以)
在文件中加入如下内容,其中<>中的信息需要替换为希望使用的信息
<username1>:
email: <username1@amazon.com>
failed_login_attempts: 0 # Will be managed automatically
logged_in: True # Will be managed automatically
name: <User Test>
password: < >
密码需要使用 demo 中提供的密码工具,将密码明文 hash 加密后再添加到 config.yaml 中
python3 password_generator.py <password>
用户信息添加后,并将密码明文 hash 后配置文件的例子
如果有多个用户一起使用本 demo,可以创建多个用户和密码。
也可以为不同的 Discord server 创建用户,用以隔离不同 server 间的 job。
如果同一 Discord server 下有多个 Channel,并且有不同的数据分析周期,也可以通过创建不同的用户来隔离不同的 job。
执行完毕后启动服务
streamlit run demo_app.py --server.port 6001
执行完后,会在命令行里有如下输出,复制 External URL 到浏览器,即可访问本方案
为了能够访问 6001 端口的服务,需要配置安全组允许外部连接到 6001 端口
Demo 演示
输入之前创建的用户和密码,登录 demo
配置 Discord 信息,以及 Discord 数据抓取频度
如果需要给同一个 Discord server 配置多个 Channel,可以在 Channel ID 里输入多个 ID 并以回车换行分隔,同个一个 Discord server 只需要一个 Token,不要在 Token 里输入多个 Token。
如果有多个 Discord server 需要配置,需要为不同的 server 创建不同的用户来进行隔离。
运行完成的 Discord 洞察 Job 信息查询
点击导航栏“User Jobs”,然后直接点击“实时查询”
在 Job 列表中选择需要查看的报告,复制 job_run_id 的信息
总结结果展示
在“Summarize Results”页面,黏贴“Job ID”,然后点击“实时查询”
总结
通过以上内容演示,我们可以看到利用 Amazon Bedrock 结合 Aerverless 的架构可以帮助我们快速地获取玩家在 Discord 的讨论内容,并根据玩家发表的内容进行语义抽取以及统计,方便游戏运营人员及时作出游戏内容或者策略调整,达到对于玩家舆情的监控,大大提升玩家的留存,提升玩家的游戏体验。
*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。
参考资料
https://discordpy.readthedocs.io/en/stable/discord.html
https://www.langchain.com/
本篇作者