亚马逊AWS官方博客

使用 Lambda 订阅Amazon DynamoDB 变更数据,并传输到Amazon OpenSearch,实现全文检索

­­­­场景介绍

2012年亚马逊云科技推出了全托管NoSQL数据库服务 – Amazon DynamoDB。 十年间,Amazon DynamoDB在各个行业得到广泛采纳,例如游戏、广告、物联网和互联网软件等等。Amazon DynamoDB是为用户打造的稳定的完全托管式、无服务器的NoSQL­键值数据库, 兼备极致性能与扩展性。通过使用Amazon DynamoDB做为业务后端数据库,满足应用大规模高并发的OLTP需求。然而,在一些情况下,譬如需要对DDB中的数据执行复杂查询时,Amazon  OpenSearch 可以作为DynamoDB很好的补充。我们可以通过DynamoDB Streams 将他们连接在一起,实现强强联手,即使用 DynamoDB 作为持久存储,使用OpenSearch 扩展搜索功能。例如,电商用户可以使用DynamoDB 存储商品信息,使用OpenSearch 对商品信息做聚合分析查询;游戏用户可以使用DynamoDB 存储玩家在游戏中的行为日志数据,使用OpenSearch 基于行为事件、时间等维度做检索,满足游戏运营需求。著名吉他制造商Fender在他们的 Fender Digital 应用中就同时使用了DynamoDB和OpenSearch,当Fender Play 管理员创建课程内容时,数据写入DynamoDB后触发带有新项目的流事件,再通过Lambda 写入Amazon Opensearch,允许用户查询最新的课程内容。

基于这样的场景需求,我们构建了这个动手实验,设计一家销售电影的网店, 使用Amazon DynamoDB存储电影的销售信息, 包括: 电影介绍、演员、导演、时长、评分和销量等, 并使用Amazon OpenSearch 提供方便的基于文本的查询。下面先就涉及到的关键功能组件做简单介绍。

Amazon DynamoDB Streams介绍

Amazon DynamoDB是亚马逊云科技推出的一款全托管的无服务器类型的键值存储服务,具有极致的弹性伸缩能力,海量数据集下依然可以保持毫秒甚至微秒级的响应时间。

Amazon DynamoDB Streams是Amazon DynamoDB的一项功能,可以观察到表中数据的变化,也叫变化数据捕获(change data capture, CDC)。启用后,每当对Amazon DynamoDB表进行写(如put、update或delete)操作时,哪条记录被更改以及更改的内容等包含这些信息的事件将会以近乎实时的方式保存到Amazon DynamoDB Streams中,这些所有事件都会触发它所关联的Amazon Lambda函数进行处理。

我们可以通过Amazon Lambda将数据推送到Amazon OpenSearch, 来实现我们在Amazon DynamoDB中不支持的更高层次的索引功能, 例如全文索引, 空间索引,或者需要进行复杂的交叉索引查询, 或者通过Amazon Lambda的将变化流导入Amazon Kinesis Firehose, 以parquet文件的形式加载到Amazon S3, 那么就可以通过Amazon Athena进行历史审计和跟踪查询。

Amazon DynamoDB Streams是由多个Shards组成。每个Shard是一组数据的记录,其中每个记录对应于与流相关的表中的一个数据修改,Shard是由亚马逊云科技自动创建和删除的,Shard也有可能被分成多个Shard,这都是Amazon DynamoDB 自动完成的。在创建流的时候,我们可以选择哪些数据会放在事件的上下文中被推送到Amazon DynamoDB Streams中,有以下配置选项

a. OLD_IMAGE – 流记录将包含修改前的记录

b. NEW_IMAGE – 流记录将包含修改后的记录

c. NEW_AND_OLD_IMAGES – 流记录同时包含修改前和修改后的记录

d. KEYS_ONLY – 只包含主键

Amazon DynamoDB Streams跟Amazon Lambda交互的特点是: 保证至少一次, 按事件发生顺序调用Amazon Lambda处理函数。  每个Amazon Lambda函数在一个隔离的环境里执行,Amazon DynamoDB Streams的数据处理受Amazon Lambda函数可以执行的处理量的限制, 建议一些重量级的处理函数, 例如要得到确认才能返回,最好使用Amazon SQS 事件队列减少风险。

Amazon DynamoDB Streams 在变化流上保证至少一次的事件执行,由此,Amazon DynamoDB不仅仅是一个数据库,客户可以围绕这些数据管道构建一个完整的数据处理引擎,设计各种不同的工作负载。

Amazon OpenSearch 介绍

Amazon OpenSearch 是一个开源的分布式搜索和分析引擎,衍生自Elasticsearch,用于日志分析、实时应用程序监控、点击流分析和文本搜索等使用情况。

Amazon OpenSearch 是一项托管服务,为集群提供所需资源并保证高可用, Amazon OpenSearch 会自动检测并替换故障的节点,减少管理基础设施的开销。使用Amazon控制台就可以在几分钟内部署一个Amazon OpenSearch 集群。建立Amazon OpenSearch 集群没有前期费用,只需为使用的服务资源付费。

Amazon OpenSearch 还提供与 Logstash和Kibana 等开源工具的集成,用于数据摄取和可视化,也可以跟其他AWS服务无缝集成,如Amazon VPC,Amazon KMS、Amazon Kinesis Data Stream、Amazon Lambda、Amazon IAM、Amazon Cognito和Amazon CloudWatch 等,可以快速、安全地从数据中获得可操作的洞察力。

动手实验:

场景回顾:

有一家销售电影的网店, 使用Amazon DynamoDB存储电影的销售信息, 包括: 电影介绍、演员、导演、时长、评分和销量等, 并使用Amazon OpenSearch提供方便的基于文本的查询。

准备:Amazon 账号

部署(Amazon CloudFormation 模版):

https://github.com/MinDengDeng/dynamodbstreaming

  1. 登录到亚马逊云科技控制台 https://console.thinkwithwp.com/
  2. 进入Amazon CloudFormation控制台
  3. 选择创建堆栈
  4. 在出现的窗口中,使用模板文件: https://s3.amazonaws.com/ee-assets-prod-us-east-1/modules/ffdfaf7928464a518672c03d5d6490d3/v1/cloudformation_public.yaml
  5. 输入堆栈名称, 模板参数可以保持默认值,也可以自定义参数
  6. 点击下一步
  7. 保留默认值。点击下一步
  8. 勾选 “I acknowledge that AWS CloudFormation might create IAM resources with custom names”
  9. 点击创建

部署大约需要15-20分钟

模版部署成功后,会创建以下资源:

  1. Amazon DynamoDB表, 表开启Amazon DynamoDB Streams 功能,并关联Amazon Lambda 函数
  2. 创建Amazon Cognito,为Amazon OpenSearch kibana登陆提供身份认证
  3. 提供Amazon APIGateway接口, 通过Amazon Lambda可以修改指定的电影数据, 触发Amazon DynamoDB Streams逻辑
  4. 提供Amazon Lamda 入口, 修改Amazon DynamoDB中的随机200条项目,触发Amazon DynamoDB Streams逻辑
  5. 自动触发Amazon Lambda函数,读取Amazon S3中的电影数据到Amazon DynamoDB中, 触发Amazon DynamoDB Streams逻辑

详细说明:

DynamoDB数据

以下是DynamoDB中电影样本数据存储的信息

  1. Id – 电影的唯一ID
  2. Actors – 演员
  3. Clicks – 点击数量
  4. Directors – 导演
  5. Genres – 电影类型
  6. Image_url – 电影宣传照片
  7. Plot – 介绍
  8. Price – 价格
  9. Purchases – 购买人数
  10. Rank – 排名
  11. Rating – 评分
  12. Running_time_secs – 播放时长
  13. Title – 电影名字
  14. Year – 上映年份

创建Amazon Cognito User接入Amazon OpenSearch Kibana

Amazon CloudFormation 不支持模版中创建 Amazon Cognito User,通过Amazon Lambda 函数会创建一个Amazon Cognito 用户 (用户名:kibana , 密码:Abcd1234!)。

使用Amazon OpenSearch Kibana进行搜索

通过Amazon OpenSearch Kibana URL链接登陆控制台(用户名:kibana , 密码:Abcd1234!),并修改密码。

简单搜索 – 查询title中有star wars的项目

GET movies/_search

GET movies/_search
{
    "query": {
        "match": {
            "title": "star wars"
        }
    },
    "_source": "title"
}

多个字段合并搜索

GET movies/_search
{
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "actors.keyword": {
                            "value": "Mark Hamill"
                        }
                    }
                },
                {
                    "range": {
                        "running_time_secs": {
                            "gte": "6000"
                        }
                    }
                },
                {
                    "range": {
                        "release_date": {
                            "gte": "1970-01-01",
                            "lte": "1980-01-01"
                        }
                    }
                }
            ],
            "should": [
                {
                    "range": {
                        "rating": {
                            "gte": 8.0
                        }
                    }
                }
            ]
        }
    }
}

聚合查询

GET movies/_search
{
    "query": {
        "match_all": {}
    },
    "aggs": {
        "actor_count": {
            "terms": {
                "field": "actors.keyword",
                "size": 10
            },
            "aggs": {
                "average_rating": {
                    "avg": {
                        "field": "rating"
                    }
                }
            }
        }
    },
    "size": 0
}

架构介绍:

方案部署了四个Amazon Lambda函数。 第一个是wirng lambda, 在Amazon Cloudformation 模版部署的时候会自动调用; 第二个是ddb random update, 是对Amazon DynamoDB表中的随机数据进行修改; 第三个是ddb update item 函数, 是对Amazon DynamoDB表中的某条指定id的数据进行修改, 用户把请求发送到Amazon API Gateway,Amazon API Gateway会调用此函数对数据进行操作; 第四个是ddb streams lambda函数, Amazon DynamoDB Streams的流处理函数。

Amzon CognitoUserPool  和Amazon CognitoIdentityPool 为Amazon OpenSearch 中的Kibana的认证提供基础。Amzon CognitoUserPool  控制用户的登录,通过Amazon Cognito 认证的用户都会得到角色AuthUserRole的权限授权。

模版指定了一个叫做 “id “的哈希值作为Amazon DynamoDB的主键,并且开启了StreamSpecification功能,将所有的NEW_AND_OLD_IMAGES引导到CDC流, Amazon DynamoDB Streams关联了一个Amazon Lambda 函数,把相应的信息发送给Amazon OpenSearch。

代码介绍:

Amazon CloudFormation 创建了四个Lambda函数来执行对应的任务。

  1. Wiring function 在创建堆栈时被调用。它有以下三个功能
    • 向Amazon OpenSearch发送一个字段映射,为电影数据的字段设置类型。
    • 下载并发送电影数据到Amazon Dynamo DB。
    • 执行了Amazon CloudFormation中不支持的功能–创建Cognito用户。

核心代码:

def send_mapping():
    # Send mapping to the domain
    url = 'https://{}/{}'.format(os.environ['AES_ENDPOINT'], index_name)
    send_signed('delete', url, region=os.environ['REGION'])
    body = ' '.join(constants.MAPPING.split())
    send_signed('put', url, region=os.environ['REGION'], body=body)

def send_data_to_ddb(file_contents):
    imdb_data = json.loads(file_contents)
    for rec in imdb_data:
        fields = inject_price_clicks_and_purchases(rec['fields'])
        put_item(rec['id'], rec['fields']['year'], **inject_types(fields))
  1. Update item(s)
    • DynamoDB update item function

可以使用DynamoDB update item函数,这个函数修改指定id的Amazon Dynamo DB表中的记录,来产生变化事件,由Amazon Dynamo DB流捕获并转发到Amazon OpenSearch。

    • DynamoDB Random update function
      可以使用DynamoDB Random update函数,这个函数随机挑选Amazon Dynamo DB表中的数据进行修改,来产生变化事件,由Amazon Dynamo DB流捕获并转发到Amazon OpenSearch。

核心代码:

def add_int_value_to_item(item_id, attr_name, val):
    # handle request and update corresponding item
    ExpressionAttributeNames={'#{}'.format(attr_name): attr_name}
    ExpressionAttributeValues={':incr' : {'N' : str(val)}}   
    ddb_client.update_item(
        TableName=os.environ['DDB_TABLE_NAME'],
        Key={'id': {'S': item_id}},
        UpdateExpression='SET #{} = #{} + :incr'.format(attr_name, attr_name),
        ExpressionAttributeNames={'#{}'.format(attr_name): attr_name},
        ExpressionAttributeValues={':incr' : {'N' : str(val)}},
    )
  1. Amazon DynamoDB Streaming function
    DDBStreams 函数会处理来自Amazon DynamoDB Streams的插入、修改和删除事件事件。通过事件流,数据被引导到Amazon OpenSearch域。

核心代码:

def handler(event, context):
    # handing DynamoDB streams
        if record['eventName'] == 'INSERT':
            movie_buffer.add_record(record['eventName'].lower(),
                                    item_to_dict(record['dynamodb']['NewImage']))
        elif record['eventName'] == 'MODIFY':
            new_image = item_to_dict(record['dynamodb']['NewImage'])
            old_image = item_to_dict(record['dynamodb']['OldImage'])
            movie_buffer.add_record(record['eventName'].lower(), new_image)
            update_buffer.add_record('insert',
                                     create_monitoring_record(new_image, old_image),
                                     has_id=False)
    return True

总结(最佳实践):

  1. 最终一致性。Amazon DynamoDB流事件是接近实时的,但不是实时的,在事件发生的时间和事件交付的时间之间会有小的延迟。
  2. 约束条件。流中的事件保留24小时,且一次最多只能有两个进程从一个流分片(shard)中消费信息。
  3. 权限控制。每个Amazon DynamoDB Streams对应一个Amazon Lambda函数, 有助于保持最小的IAM权限和尽可能简单的代码。
  4. 失败处理。将处理逻辑包裹在一个try/catch子句中,将失败的事件存储在Amazon SQS 的 DLQ(死信队列)中重试。

结束语

通过Amazon DynamoDB Streams,将Amazon DynamoDB 与亚马逊云科技平台上的其他服务高效的整合在一起,实现了对Amazon DynamoDB 中的数据做聚合分析和全文索引的能力。这样的方案,在亚马逊云科技的用户中得到实践,例如游戏场景中,使用Amazon DynamoDB存储游戏玩家数据,包括游戏状态信息、玩家会话历史数据等,通过DynamoDB Streams将数据写入Amazon OpenSearch,通过Amazon OpenSearch 对游戏用户信息作检索,满足游戏运营的需求。

完整参考架构方案如下图:

本篇作者

谢燕敏

亚马逊云科技APN解决方案架构师,负责合作伙伴架构咨询和设计,同时致力于亚马逊云科技在国内和全球企业客户的应用和推广。拥有多年分布式应用开发和云平台运维开发经验。

李君

亚马逊云科技数据库解决方案技术专家,负责基于亚马逊云计算数据库产品的技术咨询与解决方案工作,特别专注于从SQL到NoSQL 数据库的设计、测试、迁移、运维及优化等工作。