一、前言
Amazon Connect 是亚马逊云科技推出的 AI 赋能的全渠道云联络中心,它简单易用,可根据客户需求扩展到任意规模,被广泛应用于各种规模、各种行业的客户。
Amazon Connect 具有原生的通话录音功能,可以选择仅对坐席、仅对客户或者对坐席与客户同时进行通话录音。有了这个功能,客服经理可以监控实时对话,并查看和下载通话录音。但是这个功能仅连接到坐席时才会记录对话,连接坐席之前/之后的内容,或者完全在无坐席参与的情况下,都不会被记录。
在客户的实际场景中,存在很多无坐席参与时的通话录音需求。比如客户来电转接场景,客户的电话直接被转接到外部号码。再比如电话通知场景,直接拨打客户电话并播报语音。在这些场景下通常也有通话录音的需求,比如追溯历史通话内容、分析客户情绪、使用录音进行培训等。
本文针对这个需求,使用 Kinesis Video Streams(简称 KVS)、Kinesis Data Streams(简称 KDS)和 Lambda 等服务构建了 Amazon Connect 全场景通话录音方案,无论是否有坐席参与均可实现通话录音。
二、方案架构
Amazon Connect 可以自定义联络中心从开始到结束的整个服务体验和流程,实现这一能力的核心是创建 Amazon Connect Flows。Flow 有点类似但超越于传统的 IVR(interactive voice response),您可以通过 Flow 与其它 AWS 服务交互,以创建动态的个性化的服务流程。
Amazon Connect 提供了 Flow 设计器,这是一个拖拽界面,让您用类似搭“积木”的方式来自定义联络中心。其中的一个“积木”是媒体流,通过它您可以捕获与联络中心交互期间的音频,并将音频发送到 KVS。根据您的设置,可以捕获如下双向交互的音频或仅捕获一个方向的音频:
- 客户听到的内容,包括坐席/转接到外部电话的接听者所说的内容和系统提示的内容。
- 客户所说的内容,包括他们在等待时所说的话。
在开启媒体流之后,这两个方向的音频会被分别录制到 KVS 的两个单声道音轨中,但无法直接查看或者下载。本方案通过编写 Lambda 实现对 KVS 音轨的提取和混音。方案整体架构如下。
通话录音业务流程简述如下:
- 客户拨打 Amazon Connect 电话。
- Amazon Connect 进行通话录音并将音频流存储到 KVS。
- 通话结束后,Amazon Connect 将 CTR 发送到 KDS。
- Lambda 从 KDS 中读取 CTR 记录,提取 KVS 音轨并进行编码。
- Lambda 将生成的录音文件上传到到 S3 存储桶。
- Lambda 将录音文件地址写入 Amazon Connect 的联系属性。
- 坐席或者客户经理通过 CloudFront 查看或下载录音文件。
三、核心代码解析
本方案通过编写 Lambda 实现读取 CTR 记录、提取 KVS 音轨进行编码和混音、将生成的录音文件上传到 S3,并将文件地址写入 Amazon Connect 的联系属性等功能。Lambda 代码已经在 Github 上开源。下面对 Lambda 的核心代码进行解析。
3.1 读取并解析 CTR 记录
在 Lambda 的入口从 KDS 读取 CTR 并解析 KinesisEvent
,因为 Amazon Connect CTR 可能批量传输到 KDS,所以需要循环读取并处理所有的 Record。参看 KinesisEvent
的示例,其中 kinesis.data
是 CTR 的数据。
public String handleRequest(KinesisEvent kinesisEvent, Context context) {
System.out.println("Processing CTR Event");
for (KinesisEvent.KinesisEventRecord record : kinesisEvent.getRecords()) {
try {
String recordData = new String(record.getKinesis().getData().array());
System.out.println("Record Data: " + recordData);
processCTR(recordData);
} catch (Exception e) {
// if json does not contain required data, will exit early
System.out.println(e.toString());
}
}
return "{ \"result\": \"Success\" }";
}
读取到 CTR 之后,参考 CTR 的数据模型,解析并提取我们需要的录音及其它相关信息。因为并非所有的 CTR 都包含有效的录音,在正式处理之前,先做几个判断,过滤掉未授权录音、未包含录音、已处理过的录音、以及 KINESIS_VIDEO_STREAM
类型之外的其它录音等。
有读者可能会好奇为什么会有已处理过的录音?这是因为 Connect 在某些情况下会重复发送同一个 Contact ID 的 CTR,比如当 Connect 联系属性更新的时候。在 3.4 章节我们会将录音文件地址更新写入到 Connect 联系属性,所以需要加入逻辑来过滤掉已处理过的录音。
本方案使用 Amazon Connect 的 recordingAuth 属性来控制对录音的授权。在开发 Flow 时,根据需要设置 recordingAuth 属性。您甚至可以将录音的授权交给客户。recordingAuth 的有效取值:1:录制客户所说的内容, 2:录制客户听到的内容, 3:同时录制前两项内容,其它取值均视为未授权录音。
读取到有效的 CTR 之后,就可以从 KVS 提取音频并进行处理。一个 CTR 可能包含多个录音,因此需要循环读取并处理录音。本方案仅处理 KINESIS_VIDEO_STREAM
类型的录音。
private void processCTR(String ctrStr) {
JSONObject json = new JSONObject(ctrStr);
ContactTraceRecord traceRecord = new ContactTraceRecord(json);
List<KVStreamRecordingData> recordings = traceRecord.getRecordings();
if (recordings.isEmpty()) {
logger.info("No Voice recording, skipped");
return;
}
int recordingAuth = traceRecord.getAttributes().getRecordingAuth();
if ((recordingAuth <= AudioUtils.AUTH_AUDIO_NONE || recordingAuth > AudioUtils.AUTH_AUDIO_MIXED)) {
logger.info("Recording is not authorized, skipped. recordingAuth:" + recordingAuth);
return;
}
if (traceRecord.getAttributes().hasRecordingAttributes()) {
logger.info("Recording Attributes existed, skipped.");
return;
}
ConnectAttributesData connectAttributes = new ConnectAttributesData();
//A CTR may include multi recordings, event multi types of recordings, we only process the type of KINESIS_VIDEO_STREAM
for (KVStreamRecordingData recording : recordings) {
if (!recording.getStorageType().equals("KINESIS_VIDEO_STREAM")) {
logger.info("Recording StorageType is not KINESIS_VIDEO_STREAM, skipped. StorageType:" + recording.getStorageType());
continue;
}
logger.info("Recording StorageType is KINESIS_VIDEO_STREAM, recording processing started");
//System.out.println(event);
RecordingData recordingData = parseEvent(traceRecord, recording);
// Begin processing audio stream
AudioStreamService streamingService = new AudioStreamService();
try {
streamingService.processAudioStream(recordingData);
logger.info(String.format("fromCustomer: %s, toCustomer: %s, mixed: %s", recordingData.getAudioFromCustomer(), recordingData.getAudioToCustomer(), recordingData.getAudioMixed()));
//append audio file path to connect attributes
//……
} catch (Exception e) {
logger.error("KVS processing failed with: ", e);
}
logger.info("recording processing finished");
}
updateConnectContactAttributes(traceRecord, connectAttributes);
}
3.2 提取 KVS 音频并进行混音
在 Amazon Connect 中,开启媒体流之后,Connect 捕获客户与联络中心交互的通话,并将音频写入 KVS。Flow 提供了开始媒体流和停止媒体流两个“积木块”,您可以灵活地设置只捕获部分交互过程或者整个交互过程中的音频。如果您在开始媒体流之后不设置停止媒体流,则通话录音会持续到通话结束,也就是说,在交互结束后自动停止媒体流。
Amazon Connect 在将音频发送到 KVS 时,会打上 ContactId
的 Tag,并将客户所说的内容和客户听到的内容分别写入到以 AUDIO_FROM_CUSTOMER
和 AUDIO_TO_CUSTOMER
命名的音轨中。从 KVS 读取音频时,判断 ContactId
是否匹配,并根据两个音轨名写入到不同的 Buffer。
public static Map<String,ByteBuffer> getByteBufferFromStream(StreamingMkvReader streamingMkvReader,
FragmentMetadataVisitor fragmentVisitor,
FragmentMetadataVisitor.BasicMkvTagProcessor tagProcessor,
String contactId) throws MkvElementVisitException {
Map<String,ByteBuffer> bufferMap = new HashMap<>();
while (streamingMkvReader.mightHaveNext()) {
Optional<MkvElement> mkvElementOptional = streamingMkvReader.nextIfAvailable();
if (mkvElementOptional.isPresent()) {
MkvElement mkvElement = mkvElementOptional.get();
mkvElement.accept(fragmentVisitor);
//logger.error(mkvElement.getElementMetaData().toString());
// Validate that we are reading data only for the expected contactId at start of every mkv master element
if (MkvTypeInfos.EBML.equals(mkvElement.getElementMetaData().getTypeInfo())) {
if (mkvElement instanceof MkvStartMasterElement) {
String contactIdFromStream = getContactIdFromStreamTag(tagProcessor);
if (contactIdFromStream != null && !contactIdFromStream.equals(contactId)) {
//expected Connect ContactId does not match the actual ContactId. End the streaming by
//returning an empty ByteBuffer
logger.error("expected Connect ContactId does not match the actual ContactId");
return bufferMap;
}
tagProcessor.clear();
}
} else if (MkvTypeInfos.SIMPLEBLOCK.equals(mkvElement.getElementMetaData().getTypeInfo())) {
MkvDataElement dataElement = (MkvDataElement) mkvElement;
Frame frame = ((MkvValue<Frame>) dataElement.getValueCopy()).getVal();
ByteBuffer audioBuffer = frame.getFrameData();
long trackNumber = frame.getTrackNumber();
MkvTrackMetadata metadata = fragmentVisitor.getMkvTrackMetadata(trackNumber);
if (AUDIO_FROM_CUSTOMER.equals(metadata.getTrackName())) {
//logger.info("AUDIO_FROM_CUSTOMER audioBuffer size: " + audioBuffer.remaining());
bufferMap.put(AUDIO_FROM_CUSTOMER, audioBuffer);
return bufferMap;
} else if (AUDIO_TO_CUSTOMER.equals(metadata.getTrackName())) {
//logger.info("AUDIO_TO_CUSTOMER audioBuffer size: " + audioBuffer.remaining());
bufferMap.put(AUDIO_TO_CUSTOMER, audioBuffer);
return bufferMap;
}
}
}
}
return bufferMap;
}
在 Amazon Connect 原生的通话录音中,不同方向的音频存储在不同的立体声声道上。其中,客户所说的内容存储在左声道中,客户听到的内容存储在右声道中。而从 KVS 的音轨中提取的音频是单声道,如果开启了双向音频录制,我们采用原生方案类似的方式,将客户所说的内容和客户听到的内容分别存储在左右声道,并混合成立体声。
public static File mixMonoAudios(String fromCustomer, String toCustomer, String contactId) {
//long unixTime = System.currentTimeMillis() / 1000L;
File output = new File(String.format("/tmp/%s_audio_mixed.wav", contactId/*, unixTime*/));
try {
File fromCustomerFile = new File(fromCustomer);
File toCustomerFile = new File(toCustomer);
logger.info(String.format("file size: %s --- %s", fromCustomerFile.length(), toCustomerFile.length()));
AudioInputStream fromCustomerStream = AudioSystem.getAudioInputStream(fromCustomerFile);
AudioInputStream toCustomerStream = AudioSystem.getAudioInputStream(toCustomerFile);
mixAudioStreams(fromCustomerStream, toCustomerStream, output);
// Close streams
fromCustomerStream.close();
toCustomerStream.close();
} catch (Exception e) {
e.printStackTrace();
}
return output;
}
/*
* Mix two 16-bit signed samples into one
* The audio to customer is stored in the right channel.
* The audio from customer is stored in the left channel.
*/
public static void mixAudioStreams(AudioInputStream left, AudioInputStream right, File output) throws IOException {
AudioFormat format = left.getFormat();
int frameSize = format.getFrameSize();
int size = left.available();
byte[] result = new byte[size * 2];
// Mix frame sample-by-sample
for (int i = 0; i < size; i += frameSize) {
//16-bit samples
short sample1, sample2;
byte[] data1 = new byte[frameSize];
byte[] data2 = new byte[frameSize];
int data1Len = left.read(data1);
int data2Len = right.read(data2);
if (data1Len <= 0 || data2Len <= 0) {
break;
}
// Convert bytes right 16-bit sample
sample1 = get16BitSample(data1[0], data1[1]);
sample2 = get16BitSample(data2[0], data2[1]);
// Convert back to bytes in bigEndian model, and mix data to stereo
result[i * 2] = (byte) (sample1 >> 8);
result[i * 2 + 1] = (byte) (sample1 & 0xFF);
result[i * 2 + 2] = (byte) (sample2 >> 8);
result[i * 2 + 3] = (byte) (sample2 & 0xFF);
}
AudioFormat audioFormat = new AudioFormat(8000, 16, CHANNEL_STEREO, true, false);
AudioInputStream mixedStream = new AudioInputStream(new ByteArrayInputStream(result), audioFormat, result.length);
logger.info(String.format("mixedStream size: %s", mixedStream.available()));
// Write mixed audio data to output file
AudioSystem.write(mixedStream, AudioFileFormat.Type.WAVE, output);
logger.info(String.format("output file size: %s", output.length()));
mixedStream.close();
}
// Convert two bytes to a 16-bit signed sample in bigEndian model
public static short get16BitSample(byte lower, byte upper) {
return (short) ((lower << 8) | (upper & 0xFF));
}
3.3 上传录音文件到 S3
在完成从 KVS 提取音频之后,我们将生成的音频文件上传到 S3 中。文件名按照日期和 Contact ID 作为前缀。根据音频录制设置,最多有 audioFromCustomer
、audioToCustomer
、audioMixed
三个文件。
public static S3UploadInfo uploadAudio(Regions region, String bucketName, String keyPrefix, String initiationTimestamp, String audioFilePath,
String contactId, boolean publicReadAcl,
AWSCredentialsProvider awsCredentials) {
File wavFile = new File(audioFilePath);
S3UploadInfo uploadInfo = null;
try {
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard()
.withRegion(region)
.withCredentials(awsCredentials)
.build();
ZonedDateTime zdt = parseTimestamp(initiationTimestamp);
// upload the raw audio file to the designated S3 location
String objectKey = keyPrefix + zdt.getYear() + '/' + zdt.getMonthValue() + '/' + zdt.getDayOfMonth() + '/' + wavFile.getName();
logger.info(String.format("Uploading Audio: to %s/%s from %s", bucketName, objectKey, wavFile));
PutObjectRequest request = new PutObjectRequest(bucketName, objectKey, wavFile);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("audio/wav");
metadata.addUserMetadata("contact-id", contactId);
request.setMetadata(metadata);
if (publicReadAcl) {
request.setCannedAcl(CannedAccessControlList.PublicRead);
}
PutObjectResult s3result = s3Client.putObject(request);
logger.info("putObject completed successfully " + s3result.getETag());
uploadInfo = new S3UploadInfo(bucketName, objectKey, region);
} catch (SdkClientException e) {
logger.error("Audio upload to S3 failed: ", e);
throw e;
}
return uploadInfo;
}
3.4 将录音文件地址写入 Connect 联系属性
在将录音文件上传到 S3 之后,为了方便坐席和客服经理查看,我们将文件地址写入到 Amazon Connect 联系属性里,这样就可以方便地在 Amazon Connect 管理平台查看。通过配置 Cloudfront,并以 S3 作为源,该文件地址可以直接在浏览器里访问和播放。
通过调用 Amazon Connect 的 updateContactAttributes
接口将录音文件地址写入到用户定义属性里,initialContactId
字段指定要写入的哪个联系记录。
调用 updateContactAttributes
接口更新联系属性之后,Connect 会将更新后的 CTR 重新发送到 KDS,因此 Lambda 在处理 CTR 时要能够避免重复处理同一条记录。我们在 3.1 章节的代码逻辑中已经进行了判断并跳过重复的 CTR 记录。
private void updateConnectContactAttributes(ContactTraceRecord traceRecord, ConnectAttributesData connectAttributes) {
ConnectClient connectClient = ConnectClient.builder()
.region(Region.of(REGION.getName()))
.build();
Map<String, String> attributes = new HashMap<>();
attributes.put("audioFromCustomer", connectAttributes.getAudioFromCustomer() == null ? "" : connectAttributes.getAudioFromCustomer());
attributes.put("audioToCustomer", connectAttributes.getAudioToCustomer() == null ? "" : connectAttributes.getAudioToCustomer());
attributes.put("audioMixed", connectAttributes.getAudioMixed() == null ? "" : connectAttributes.getAudioMixed());
String initialContactId = traceRecord.getInitialContactId() != null ? traceRecord.getInitialContactId() : traceRecord.getContactId();
String instanceId = traceRecord.getInstanceARN().split("/")[1];
logger.info(String.format("Instance ID: %s, Contact ID: %s, Initial Contact ID: %s", instanceId, traceRecord.getContactId(), traceRecord.getInitialContactId()));
UpdateContactAttributesRequest request = UpdateContactAttributesRequest.builder()
.attributes(attributes)
.initialContactId(initialContactId)
.instanceId(instanceId)
.build();
UpdateContactAttributesResponse response = connectClient.updateContactAttributes(request);
if (response.sdkHttpResponse().statusCode() >= 300) {
logger.error("Error updating contact attributes, status code: " + response.sdkHttpResponse().statusCode());
}
}
四、方案部署和测试
4.1 前置条件
- 一个 Amazon Connect 实例,申请好电话号码。
- 一个 S3 存储桶,用于存放通话录音文件。
- 一个 KDS,用于将 Connect CTR 发送到 Lambda。
- 一个 CloudFront 分配,源指向 S3 存储桶,用于公开访问录音文件。
4.2 配置Amazon Connect
- 开启 Amazon Connect 通话录音功能
-
- 在 Amazon Connect 里点击“Data Storage”,编辑
Live media streaming
选项。
- 开启
Live media streaming
。在 Amazon Connect 将录音写入 KDS 之后,我们很快就会在 Lambda 里进行消费,所以保留周期设置为 1 个小时足矣。
- 开启 CTR data streaming
-
- 配置将 CTR 记录发送到 Kinesis Stream,并选择预先创建好的 KDS。
- 导入或者编写 Flow
Github 库中提供了两个示例 Flows:
Demo External Transfer Recording
:展示了将来电转接到外部号码的通话录音场景。
Demo Partial Recording
:展示了通过控制开始和停止媒体流,录制系统提示音的场景。
您可以直接将示例 Flow 导入 Connect,或者根据示例 Flow 进行二次创作。下图为 Demo Partial Recording
示例 Flow。
导入示例 Flow 并发布之后,将 Flow 关联到电话号码上。
4.3 部署 Call Recording Lambda function
方案中的 Lambda 采用 Serverless Application Model(简称 SAM)开发和部署。SAM 是一个工具包,可改善在 AWS 上构建和运行无服务器应用程序的开发人员体验。AWS SAM 由两个主要部分组成:SAM 模板规范和 SAM CLI。
这里简述使用 SAM CLI 部署 Call Recording Lambda function 的步骤,详细步骤见 README。
1. 安装 SAM CLI
访问用户手册安装 SAM CLI。
2. 构建并部署 Lambda
使用如下命令构建和步骤 Lambda
sam build
sam deploy --guided
在使用 sam deploy --guided
命令部署 Lambda 时,需要填写如下参数:
- Stack Name:要部署到 CloudFormation 的堆栈的名称。 这对于您的帐户和区域应该是唯一的,并且最好与您的项目名称相匹配。
- AWS Region:您要部署应用程序的 AWS 区域。
- Parameter S3BucketName:存放录音文件的 S3 存储桶。
- Parameter S3BucketPrefix [recordings/]:录音文件的 S3 前缀,默认
recordings/
。
- Parameter CloudFrontDomain:CloudFront 分配的域名,通过此域名用户可以公开访问 S3 存储桶中的音频文件。
参数的配置参考下图。
4.4 添加 KDS 作为 Lambda 的触发器
前边我们已经配置 Amazon Connect 将 CTR 发送到 KDS,Lambda 想要消费 CTR 数据,需要添加 KDS 作为 Lambda 的触发器。在 Lambda 的控制台找到新创建的 Lambda 函数,按照下图添加并配置 KDS 触发器。Batch size 指定发送到 Lambda 的每批次 CTR 的数量,可以根据需要调整。
4.5 方案测试
在方案部署完之后,就可以拨打关联示例 Flow 的电话号码进行测试。在通话结束之后,进入到 Connect 后台的 Contact Search 页面,找到对应的记录,点击之后就可以看到通话录音 URL 地址。
注意:Lambda 处理 CTR 可能会有延迟,如果您看不到通话录音 URL,请等待一会再刷新。
五、总结
Amazon Connect 原生具有通话录音功能,但仅可录制坐席参与过程中的对话内容。在客户的实际场景中,存在很多无坐席参与时的通话录音需求。本方案就是针对这个需求进行设计,可以实现任何情况下对部分或者全部、单向或者双向通话内容进行录音。
当然,本方案只是一个参考或者起点,在实际的应用场景中,您可以根据实际情况继续优化。比如录音文件保存到 S3 中后 ,可以根据文件保留时间需求做生命周期管理以节省成本。本方案将录音文件的 URL 写入到 Connect 的用户定义联系属性里,但 Amazon Connect 的联系属性只能保存 24 个月,过期就会被删除。如果需要更长的保存时间,可以考虑将录音文件 URL 做持久化存储,存储到 DynamoDB 等数据库中。
另外,本方案通过 CloudFront 对 S3 的录音文件进行公开访问,在实际的场景中,建议根据需要对用户进行认证和授权。
附录
- 本方案 Lambda 开源代码:https://github.com/freewine/AmazonConnectCallRecording
- Amazon Connect 用户指南:https://docs.thinkwithwp.com/connect/latest/adminguide/amazon-connect-get-started.html
- Amazon Connect 通话录音行为:https://docs.thinkwithwp.com/connect/latest/adminguide/set-up-recordings.html
- SAM 用户指南:https://docs.thinkwithwp.com/serverless-application-model/latest/developerguide/serverless-getting-started.html
本篇作者