亚马逊AWS官方博客

Amazon Glue集成Delta Lake构建事务型数据湖上的流式处理

数据湖的挑战

伴随着云服务的流行,数据湖技术渐渐兴起,越来越多的企业开始搭建自己的数据湖。但传统数据湖基于文件,一般都是以追加的形式修改数据。当有数据需要改变时,经常需要读取全部内容重新写回到平台。基于HDFS、S3等传统数据湖方案,只能增加文件不方便修改文件中的内容。想要实现某条记录的UPSERT(UPDATE 和 INSERT 的混合)变更,工程师需要构建复杂的Pipeline来读取整个分区或表,需要读取对应的文件并进行重写,这种Pipeline效率很低,而且难以维护。另外,数据工程师经常遇到这样的问题:不安全地写入数据湖,这会导致如果读取数据过程中同时又有数据写入,那么将会产生垃圾数据。那么必须构建其他额外的方法,来保证数据同时读写情况的数据一致性。

而在日常数据运维过程当中,往往又会遇到过多小文件的问题,特别是在流数据入湖的场景里,原始数据注入通常都会以kilobytes甚至是bytes级别大小的文件保存,这在后续当业务需要去query这些历史数据时,会极大的影响query的性能。所以运维数据湖平台时需要面对的一个最常见的场景就是需要在不影响业务的情况下,去清理合并那些小文件来提高query数据湖的性能。另外,如果具体业务场景对时延要求没有非常高,比如下游报表可以允许分钟甚至小时级别数据展示,那么就没有必要持续运行流式作业,针对这种成本敏感型的需求,应该有一种方式允许空闲计算资源自动停机。而针对这些场景,我们也会在这篇博客中给出一个合理的解决方法。

本篇博客会使用开源Delta Lake结合Amazon Glue, 简化构建数据湖的方式去解决上述问题。我们会带您一步一步地在Amazon上部署近实时的流式数据入湖的方案。包括从 Amazon MSK 当中读取流式数据,核对数据schema,使用Amazon Glue实现无服务器流式ETL作业,并且实现像关系型数据库一样的增删查改,保证ACID。与此同时,我们还会使用MWAA (Managed Workflow for Apache Airflow)来实现数据湖上运维自动化,包括性能的优化。最终在数据湖上实现海量数据UPSERT以及事务的管理能力,即事务型的数据湖。

我们会使用一些模拟的业务库发过来的订单和会员信息作为流式数据的来源,使用MSK对接,然后通过Glue Job读取并向下游拥有UPSERT能力的数据湖(Delta Lake)进行同步并将这种有变更状态的数据存储于Amazon S3,并通过Amazon Athena来提供实时数仓查询并通过QuickSight进行Dashboard展示。整套方案大部分是基于Amazon的托管服务,具有开箱即用,自动扩展,内置高可用性和按使用付费的计费模式,可提高敏捷性,优化成本同时减少数据运维的工作。

开源Delta Lake简介

开源Delta Lake是基于Apache Spark的下一代数据湖存储引擎,也是目前市面上主流的数据湖存储引擎之一。支持MERGE命令,可以高效的完成UPSERT语义。同时Delta Lake将ACID事务带入数据湖,提供了可序列化性,最强的隔离级别。Delta Lake就是既能解决更新效率低下,又能进行并发读写控制,还支持时间回溯,查看历史任意时间的数据快照。也正是这些优越的特性,所以我们决定以开源Delta Lake为核心搭建数据湖。

以下是Dela Lake的基本能力的一个总结:

  1. 支持UPSERT/DELETE:Delta Lake 将支持MERGE、UPDATE和DELETE的DML命令。可以轻松地在数据湖中插入和删除记录,并简化他们的变更数据捕获和满足GDPR。
  2. ACID事务性:Delta Lake 在多个写操作之间提供 ACID 事务性。每一次写操作都是一个事务操作,事务日志(Transaction Log)中记录的写操作都有一个顺序序列。
  3. Schema管理:Delta Lake 会自动验证正在写入的 Spark DataFrame 的 Schema 是否与表的Schema 兼容。
  4. 数据版本控制和时间旅行:Delta Lake 允许用户读取表或目录的历史版本快照。当文件在写入过程中被修改时,Delta Lake 会创建文件的新的版本并保留旧版本。当用户想要读取表或目录的较旧版本时,他们可以向 Apache Spark 的 read API 提供时间戳或版本号。
  5. 可伸缩的元数据(Metadata)处理:Delta Lake 将表或目录的元数据信息存储在事务日志(Transaction Log)中,而不是元数据 Metastore 中。这使得 Delta Lake 够在固定时间内列出大目录中的文件,并且在读取数据时效率更高。

解决方案架构

由于篇幅有限,本篇博客主要是在验证使用开源Delta Lake在Amazon上构建事务型数据湖的可行性,而对于数据湖上跟事务不直接相关的特征,包括数据血缘,数据权限等,会暂时忽略。而本篇博客的实验环境会基于下图的架构搭建。其中,数据源方面,我们会使用Python的Faker库产生一些dummy data,并将这些dummy data注入到Amazon MSK (托管型的Kafka集群),然后会使用带有delta jar包的Glue ETL streaming job来将数据从Kafka集群中取出,并做去重处理,存储在核心数据湖存储(S3)中,而在这里核心验证的概念就是当新的数据是去更新已有的数据列时,Delta Lake会保证已经存储在数据湖中的数据得到相应的更新而不是简单的append only。这种持续更新的数据最终我们也会通过Athena(托管的presto服务)query来做一个展示,以证明数据持续的更新能够可靠的以近实时的效率在BI报表中展示。

在这里还需要说明的是目前这个架构虽然是实时流式处理。但glue streaming本身就是基于spark structured streaming 的引擎,其对于流的处理实际上是以微批的形式,那么这也就意味着截取微批的时间窗口会对数据的延迟性产生较大的影响,并且我们在第三个Job做UPSERT处理时,如果在同一个微批内出现多条反复更改同一条数据的事务,那么将只有那个微批内的最新的更改会出现在processed的表内,也意味着每个微批实际上只会记录下最新的更改。这也是为什么我们会主张在数据湖上分多个层级来收纳流式数据入湖。

架构上Job1负责从MSK读取原始数据并且把数据以Delta的格式写入原始数据层(raw layer),同时我们会按processing time以年、月、天、小时来分区。数据在进入这一层前没有做数据清洗和处理,并允许重复订单和会员数据直接以append only的方式全量的录入数据湖保证Single source of truth。我们为了模拟一个更加接近真实的业务景,Job2从原始数据层读取数据,做了一些业务处理的逻辑和数据清洗,例如我们实现了Stream-Stream Join(流关联)和字段的处理。最后通过Job3基于Append Only产生的Delta table做MERGE处理,实现UPSERT逻辑然后写入处理层(processed layer)的ODS表供下游消费端查询,这一层类似ODS层, 可以做进一步数据清洗和去重等业务逻辑。Job4主要是用Delta API来生成Manifest文件便于Athena可以查询到Delta table。

环境搭建

本博客使用东京区域进行搭建,但读者可自由选择区域,架构中所用到的组件已经在Amazon的大多数区域上线。

前提条件

  1. 具有管理员权限的Amazon账号
  2. 本地安装有Amazon CLI,以及terraform
  3. 配置好Amazon profile的access key, secret key (详细可参看link)

本博客假定读者都具备有基础的Amazon知识,并且熟悉Amazon服务的基本概念,有使用Amazon的经验。整个架构的实现代码可以参考link

步骤 1: 网络环境

由于架构中使用了Amazon MSK (Managed Streaming for Apache Kafka),我们需要搭建一个VPC环境。而这里的VPC的网络选择可以根据自己的需要来,我们在这个实验环境里用到的Kafka由于不需要接收来自于VPC外部的流量,所以我们的VPC并不需要任何互联网访问的途径。整个VPC搭建比较简单,在这里也不再过多的赘述。为方便大家搭建起整个实验环境,我们也已将整个环境写成了terraform code来实现一键化部署。

首先将代码下载至本地:

git clone git@github.com:wei-zhong90/deltalake-aws-poc-demo.git
cd deltalake-aws-poc-demo/infrastructure

然后修改相应的参数:

其中这四个参数分别代表:

  • region: 环境部署的区域
  • public_key: 之后会用来登录测试实例用的密钥对的公钥
  • kafka_test_topic, kafka_test_topic_2分别代表着Kafka集群用来接收数据的两个topic的名字,可自定义这两个topic的名字

在修改好相应参数后,便可执行以下代码进行部署:

terraform init
terraform apply

这一部分的terraform代码会直接部署好一个vpc,一个msk集群,一个测试用linux实例,以及三个后面会详细介绍的glue streaming job。整个过程可能会需要花35分钟,主要是MSK的部署会需要消耗大量的时间。

步骤 2: 启动订单和会员数据生成程序

在整个部署完成以后,可以进入到ec2的控制台界面查看新生成的名为“msk-dev-workspace”的实例,如下图。

使用之前部署时配置的密匙对ssh进入该实例,在HOME路径下会看见一个kafka的dir,这个实际上是Kafka的官方安装包,我们之后也会用这个安装包里面的工具来查看MSK集群里的数据。

接下来,我们需要启动订单数据生成程序,执行以下命令:

cd deltalake-aws-poc-demo
python3 -m virtualenv .venv
source .venv/bin/activate
python3 -m pip install -r requirements.txt
vi .env

这个时候会进入到以下界面:

在这里我们需要将这几个变量换成我们自己设定的几个值。首先,BOOTSTRAP_SERVERS的值来自于之前部署好的MSK集群,我们可以进入到MSK的控制台进行查看,如下。

我们会在MSK的控制台下看到一个新的MSK集群,名为app;然后点进这个集群后就可以在右上角点击‘View client information’,就可以看到几种节点的连接链接,在这里我们需要复制TLS的链接,然后填到BOOTSTRAP_SERVERS的变量下面。而后面的几个变量,TOPIC, SECOND_TOPIC则是我们之前在部署terraform代码时更改的那两个topic的变量名称,请务必注意保持一致。最后两个变量是用来控制订单数据生成的量和频率的,在这里可以不做更改。

在完成更改以后,保存更改,即可执行一下指令,开始自动生成订单数据,并发送至Kafka集群。

python3 generator.py

在执行完这条指令后,一个简单的python程序就会开始向定义的那两个topic里面发送两种数据,一种是订单数据,一种则是会员等级变更数据,我们在之后的glue中就会对这两种数据进行收集和处理。

在这里需要说明的是,这里生成的订单数据实际上只有六种订单,在下图中可以看到这六种订单实际上就是分属于6个不同的order_id及order_owner,而不断生成的订单数据则是不断的更新这六种订单的order_value和timestamp。另外一种会员等级变更数据则是针对这六个order_owner不断的更改他们的membership的等级。在实际场景中,订单数据并不会如此频繁的更改,会员等级更不会频繁变动,但在这个实验环境下,我们为了展示Delta lake的Upsert特性,就模拟了这种极端的情况。

步骤 3: 配置Glue作业

Amazon Glue是一项完全托管,无服务器架构的ETL服务。客户无需预置基础设置,仅需由Glue负责预置、扩展Spark运行环境,客户只需要专注开发ETL代码,并且使用Amazon Glue时,只需为ETL作业运行时间付费。另外,Glue Streaming 基于开源Spark Structured Streaming 构建,面向微批处理,继承了 Spark的所有特性可以完全集成Delta Lake。

本步骤阐述了用Amazon Glue来搭建三个流式ETL作业的业务逻辑,具体Glue任务和代码都会通过Terraform来自动部署, 我们主要重点介绍Job的功能实现而不是Job的具体配置,Terraform会创建如下三个Job:

  • raw_process
  • join_process
  • upsert_process

3.1 第一个Job负责从MSK读取原始数据并且把数据以Delta的格式写入原始数据层(raw layer),同时我们会按processing time以年、月、天、小时来分区。数据在进入这一层前没有做数据清洗和处理,并允许重复订单和会员数据,在实际生产中这一层也可以根据业务命名为Staging Layer/Ingestion Layer/Landing Area。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.context import SparkConf
from awsglue.job import Job
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
from datetime import datetime

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, from_json, lit

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'bucket_name', 'bootstrap_servers', 'topic1', 'topic2'])
data_bucket = args['bucket_name']
bootstrap_servers = args['bootstrap_servers']
topic1 = args['topic1']
topic2 = args['topic2']
checkpoint_bucket1 = f"s3://{data_bucket}/checkpoint/"
checkpoint_bucket2 = f"s3://{data_bucket}/membership_checkpoint/"
  
schema1 = StructType([ \
 StructField("order_id", IntegerType(), True), \
 StructField("order_owner", StringType(), True), \
 StructField("order_value", IntegerType(), True), \
 StructField("timestamp", TimestampType(), True), ])

schema2 = StructType([ \
 StructField("order_owner", StringType(), True), \
 StructField("membership", StringType(), True), \
 StructField("timestamp", TimestampType(), True), ])

def insertToDelta1(microBatch, batchId): 
 date = datetime.today()
 year = date.strftime("%y")
 month = date.strftime("%m")
 day = date.strftime("%d")
 hour = date.strftime("%H")
 if microBatch.count() > 0:
   df = microBatch.withColumn("year", lit(year)).withColumn("month", lit(month)).withColumn("day", lit(day)).withColumn("hour", lit(hour))
   df.write.partitionBy("year", "month", "day", "hour").mode("append").format("delta").save(f"s3://{data_bucket}/raw/order/")

def insertToDelta2(microBatch, batchId):
 if microBatch.count() > 0:
   microBatch.write.partitionBy("order_owner").mode("append").format("delta").save(f"s3://{data_bucket}/raw/member/")

class JobBase(object):
 fair_scheduler_config_file= "fairscheduler.xml"
  def get_options(self, bootstrap_servers, topic):
   return {
     "kafka.bootstrap.servers": bootstrap_servers,
     "subscribe": topic,
     "kafka.security.protocol": "SASL_SSL",
     "kafka.sasl.mechanism": "AWS_MSK_IAM",
     "kafka.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
     "kafka.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
     "startingOffsets": "earliest",
     "maxOffsetsPerTrigger": 1000,
     "failOnDataLoss": "false"
   }
  def __start_spark_glue_context(self):
   conf = SparkConf().setAppName("python_thread").set('spark.scheduler.mode', 'FAIR').set("spark.scheduler.allocation.file", self.fair_scheduler_config_file)
   self.sc = SparkContext(conf=conf)
   self.glue_context = GlueContext(self.sc)
   self.spark = self.glue_context.spark_session
  def execute(self):
   self.__start_spark_glue_context()
   self.logger = self.glue_context.get_logger()
   self.logger.info("Starting Glue Threading job ")
   import concurrent.futures
   executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
   executor.submit(self.workflow, topic1, schema1, insertToDelta1, checkpoint_bucket1, 1)
   executor.submit(self.workflow, topic2, schema2, insertToDelta2, checkpoint_bucket2, 2)
   self.logger.info("Completed Threading job")
  
  
 def workflow(self, topic, schema, insertToDelta, checkpoint_bucket, poolname):
  
   self.sc.setLocalProperty("spark.scheduler.pool", str(poolname))
   # Read Source
   df = self.spark \
     .readStream \
     .format("kafka") \
     .options(**self.get_options(bootstrap_servers, topic)) \
     .load().select(col("value").cast("STRING"))

   df2 = df.select(from_json("value", schema).alias("data")).select("data.*")


   # Write data as a DELTA TABLE
   df3 = df2.writeStream \
     .foreachBatch(insertToDelta) \
     .option("checkpointLocation", checkpoint_bucket) \
     .trigger(processingTime="60 seconds") \
     .start()

   df3.awaitTermination()

def main():
 job = JobBase()
 job.execute()


if __name__ == '__main__':
 main()

3.1.1 在代码里,由于订单及会员数据被分别发送至kafka stream里面的两个topic,所以第一个用于收集原始订单和会员数据的glue job需要能够同时从两个kafka topic里面读取数据,然后并发分别写入两张Delta表,我们使用了Python多线程模块concurrent.futures和ThreadPoolExecutor类来并发调用两次workflow function,在workflow function内首先读取Kafka的Topic并通过IAM认证方式读取流式加密的数据并转化数据为STRING格式:

 def workflow(self, topic, schema, insertToDelta, checkpoint_bucket, poolname):
  
   self.sc.setLocalProperty("spark.scheduler.pool", str(poolname))
   # Read Source
   df = self.spark \
     .readStream \
     .format("kafka") \
     .options(**self.get_options(bootstrap_servers, topic)) \
     .load().select(col("value").cast("STRING"))

3.1.2 通过from_json把JSON string转化为事先定义好的DataFrame StructType:

df2 = df.select(from_json("value", schema).alias("data")).select("data.*")

3.1.3 通过Spark资源调度Fair Scheduler的self.sc.setLocalProperty(“spark.scheduler.pool”, str(poolname)),我们可以提交Job到指定的pool,从而实现在一个Glue Job里资源的平等分配, 我们用到foreachbatch API 去触发一个传入的insertToDelta function参数,具体逻辑通过insertToDelta1和insertToDelta2函数里按照每60秒的窗口把数据按照类似Hive分区方式,以Processing time的年、月、日、小时来写入到S3上,例如InsertToDelta1:

def insertToDelta1(microBatch, batchId):  
  date = datetime.today()
  year = date.strftime("%y")
  month = date.strftime("%m")
  day = date.strftime("%d")
  hour = date.strftime("%H")
  if microBatch.count() > 0:
    df = microBatch.withColumn("year", lit(year)).withColumn("month", lit(month)).withColumn("day", lit(day)).withColumn("hour", lit(hour))
    df.write.partitionBy("year", "month", "day", "hour").mode("append").format("delta").save(f"s3://{data_bucket}/raw/order/")

3.2 第二个作业,主要是用来实现一个订单和对应会员数据的流关联,我们假设想实时分析一个会员状态和下单之间的关系,就需要将两个流按用户名字进行Join。流和流的关联目前不支持update模式, 所以我们还是以追加的形式把关联后的数据写入curated层。

import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import scala.collection.JavaConverters._
import io.delta.tables.DeltaTable
import org.apache.spark.sql.expressions.Window

import scala.collection.JavaConverters._


object GlueApp {
 def main(sysArgs: Array[String]): Unit = {
  
   val spark: SparkContext = new SparkContext()
   val glueContext: GlueContext = new GlueContext(spark)
   val sparkSession: SparkSession = glueContext.getSparkSession
   import sparkSession.implicits._
   // @params: [JOB_NAME]
   val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "bucket_name").toArray)
   Job.init(args("JOB_NAME"), glueContext, args.asJava)

   val CheckpointDir = s"s3://${args("bucket_name")}/checkpoint2"

   val raworder = sparkSession.readStream.format("delta").load(s"s3://${args("bucket_name")}/raw/order/").withWatermark("timestamp", "3 hours")
   val rawmember= sparkSession.readStream.format("delta").load(s"s3://${args("bucket_name")}/raw/member/").withWatermark("timestamp", "2 hours")

  val joinedorder = raworder.alias("order").join(
        rawmember.alias("member"),
        expr("""
          order.order_owner = member.order_owner AND
          order.timestamp >= member.timestamp AND
          order.timestamp <= member.timestamp + interval 1 hour
        """)
  ).select($"order.order_id", $"order.order_owner", $"order.order_value", $"order.timestamp", $"member.membership", $"order.year", $"order.month", $"order.day", $"order.hour")


   val query = joinedorder
     .writeStream
     .format("delta")
     .option("checkpointLocation", CheckpointDir)
     .trigger(Trigger.ProcessingTime("60 seconds"))
     .outputMode("append")
     .partitionBy("year", "month", "day", "hour")
     .start(s"s3://${args("bucket_name")}/curated/")

   query.awaitTermination()  
  
   Job.commit()

 }

}

3.2.1 流之间关联和静态数据之间的关联有一个不同,对流来说任何时候关联双方的数据都是没有边界的,也叫无界流,当前流上的任何一行数据都有可能和另一条流上未来的一行数据关联上,所以我们需要制定一个时间范围。首先我们分别给订单和会员流设定Watermark(水位线),用来抛弃超过约定时间到达的输入数据,订单最多允许迟到三小时,会员最多允许迟到两小时。

val raworder = sparkSession.readStream.format("delta").load(s"s3://${args("bucket_name")}/raw/order/").withWatermark("timestamp", "3 hours")
 val rawmember = sparkSession.readStream.format("delta").load(s"s3://${args("bucket_name")}/raw/member/").withWatermark("timestamp", "2 hours")
3.2.2 另外,我们定义跨两个流的 event time约束,以便可以确定何时不需要一个输入流与另一个输入流匹配。我们用时间范围关联条件(join condition)定义此约束。比如说,在我们场景里假设会员的状态变化可以转化为订单的生成,所以订单可以在相应的会员状态变化后 0 秒到 1 小时的时间范围内发生。如下是约束的逻辑:
val joinedorder = raworder.alias("order").join(
        rawmember.alias("member"),
        expr("""
          order.order_owner = member.order_owner AND
          order.timestamp >= member.timestamp AND
          order.timestamp <= member.timestamp + interval 1 hour
        """)

3.3 第三个作业从原始数据层读取数据,然后进行数据清洗,实现UPSERT逻辑然后写入处理层(processed layer),这一层类似ODS层,  可以做数据清洗和去从等业务逻辑。同时,我们使用 Spark原生的trigger.once mode实现一次性的流式处理,这样的配置能够允许glue streaming job在kafka stream里面的新增数据处理完之后自动停止,以避免了24*7运行成本。另外,本篇博客以主要以介绍实现UPSERT场景为主,在实际生产中,后面还会有类似EDW、DM或DWD、DWS、ADS等数据湖上的分层和一些Delta Lake高级功能例如窗口聚合, 流批一体等。

import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import scala.collection.JavaConverters._
import io.delta.tables.DeltaTable
import org.apache.spark.sql.expressions.Window

import scala.collection.JavaConverters._


object GlueApp {
 def main(sysArgs: Array[String]): Unit = {
  
   val spark: SparkContext = new SparkContext()
   val glueContext: GlueContext = new GlueContext(spark)
   val sparkSession: SparkSession = glueContext.getSparkSession
   import sparkSession.implicits._
   // @params: [JOB_NAME]
   val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "bucket_name").toArray)
   Job.init(args("JOB_NAME"), glueContext, args.asJava)
  
   //read base
   val BasePath = s"s3://${args("bucket_name")}/processed"
   val Basetable = DeltaTable.forPath(sparkSession, BasePath)
   val CheckpointDir = s"s3://${args("bucket_name")}/checkpoint3"
   //read from upstream joined stream
   val raw = sparkSession.readStream.format("delta").load(s"s3://${args("bucket_name")}/curated/")
  
   def upsertIntoDeltaTable(updatedDf: DataFrame, batchId: Long): Unit = {
    
       val w = Window.partitionBy($"order_id").orderBy($"timestamp".desc)
       val Resultdf = updatedDf.withColumn("rownum", row_number.over(w)).where($"rownum" === 1).drop("rownum")
      
       // Merge from base with source
       Basetable.alias("b").merge(
           Resultdf.alias("s"),
           "s.order_id = b.order_id")
           .whenMatched.updateAll()
           .whenNotMatched.insertAll()
           .execute()
   }


   val query = raw
     .writeStream
     .format("delta")
     .foreachBatch(upsertIntoDeltaTable _)
     .option("checkpointLocation", CheckpointDir)
     .trigger(Trigger.Once())
     .outputMode("update")
     .start(s"s3://${args("bucket_name")}/processed/")

   query.awaitTermination()  
  
  
   Job.commit()

 }

}

3.3.1  在代码里,作业会消费从上次工作完成后的新增订单和会员关联后的数据,Trigger.Once()会把所有数据处理完后自动停止Glue作业,非常经济高效。另外,这里由于我们订单和会员是源源不断的写入到raw层,会产生非常多的重复订单号, 所以我们需要在每个foreachbatch 里通过一个机制去拿到Streaming DataFrame,即传给upsertIntoDeltaTable函数每一批updatedDF DataFrame里最新的那个关联后的记录。首先,我们是通过SQL PARTITION BY 配合OVER 的方式,通过给order_id分组并给timestamp排序拿到最新的时间戳, 也就是rownum = 1的那条订单记录。然后,通过Delta Merge API来实现Resultdf和Basetable的UPSERT操作,注意Basetable是从curated层读取的Delta表,第一次需要手动建一个空表,后面3.3会提到。最后通过trigger(Trigger.Once())的方式通过delta API写入到S3的processed层更新Delta表。

   def upsertIntoDeltaTable(updatedDf: DataFrame, batchId: Long): Unit = {
        val w = Window.partitionBy($"order_id").orderBy($"timestamp".desc)
        val Resultdf = updatedDf.withColumn("rownum", row_number.over(w)).where($"rownum" === 1).drop("rownum")
        // Merge from base with source
        Basetable.alias("b").merge(
            Resultdf.alias("s"), 
            "s.order_id = b.order_id")
            .whenMatched.updateAll()
            .whenNotMatched.insertAll()
            .execute()
    }

3.3.2 最后我们还需要在processed层创建一个的初始表,里面没有数据。因为第一次跑作业时必须有一个Basetable来和新增数据做MERGE操作。

#Create processed table first time as base table
schema = StructType([ \
  StructField("order_id", IntegerType(), True), \
  StructField("order_owner", StringType(), True), \
  StructField("order_value", IntegerType(), True), \
  StructField("timestamp", TimestampType(), True), \
  StructField("membership", StringType(), True), \
  StructField("year", StringType(), True), \
  StructField("month", StringType(), True), \
  StructField("day", StringType(), True), \
  StructField("hour", StringType(), True) \
  ])

rdd = spark.sparkContext.emptyRDD()

df = spark.createDataFrame(rdd,schema)

df.write.partitionBy("year", "month", "day", "hour").format("delta").mode("overwrite").save("s3://xxx-lake-streaming-demo/processed/")

3.4 我们可以发送数据给到Kafka order topic, 然后手动开启第一个作业,大概60秒后可以看到以数据已经成功写到了如下路径:s3://xxx-lake-streaming-demo/raw/year=22/month=04/day=04/。并且格式是Delta format。我们可以看到_delta_log目录下面的json和parquet文件,这是Delta Lake实现ACID的事务日志目录。在创建新表时,Delta 将数据保存在一系列的 Parquet 文件中,并会在表的根目录创建 _delta_log 文件夹,其中包含 Delta Lake 的事务日志,ACID 事务日志里面记录了对应表的每次更改。具体原理可以参考:https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

在s3://xxx-lake-streaming-demo/raw/order/year=22/month=04/day=04/,可以看到以parquet格式的数据文件。

3.5 紧接着启动第二个任务,这里的数据仍然是按照第一个raw_process job接收数据的时间做分区,看到关联数据被写入到 s3://xxx-lake-streaming-demo/curated/year=22/month=04/day=04/。

当然实际场景我们可以通过even time或这个job本身的processing time根据业务需求来做分区。

3.6 最后启动第三个任务,过了一会我们最新的订单会员数据被成功写到了如下目录:s3://delta-lake-streaming-demo/processed/year=22/month=04/day=04/。这里的数据是经过MERGE后的数据。现在我们停止给Kafka topic发送数据,过一会这个作业处理完所有订单后会自动停止,这也是Spark原生trigger.once mode特性实现一次性的流式处理,然后停止集群,避免了24*7运行成本。

步骤 4: Athena的配置

4.1 Athena支持以外部表的方式读取Delta表,我们需要生成manifest文件。具体步骤请参考:https://docs.delta.io/latest/presto-integration.html, 运行以下代码:

deltaTable = DeltaTable.forPath(spark, "s3://delta-lake-streaming-demo/processed/")
deltaTable.generate("symlink_format_manifest")

spark.conf.set("delta.compatibility.symlinkFormatManifest.enabled", "true")

可以看到生成了_symlink_format_manifest文件目录

4.2 创建Athena表

CREATE EXTERNAL TABLE `processed`(
 `order_id` int,
 `order_owner` string,
 `order_value` int,
 `timestamp` timestamp,
 `membership` string)
PARTITIONED BY (
 `year` string,
 `month` string,
 `day` string,
 `hour` string)
ROW FORMAT SERDE
 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT
 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
 's3://delta-lake-streaming-demo/processed/_symlink_format_manifest'
TBLPROPERTIES (

4.3 查询processed这张Delta表, 成功返回UPSERT后的实时订单会员数据。

步骤 5: 数据湖上的编排和运维自动化

Amazon Managed Workflows of Apache Airflow (MWAA) 是一项托管编排服务Apache Airflow。借助 Amazon MWAA,您可以使用 Airflow 和 Python 创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。亚马逊 MWAA 自动扩展其工作流程执行能力以满足您的需求,并与Amazon安全服务,帮助您快速安全地访问您的数据。

5.1 MWAA的部署可以参考具体文link

5.2 通过MWAA编排主要实现已下自动化功能

  • 每小时触发一次job-stage3流式作业;
  • 每小时从新生成manifest文件作业;
  • 每小时在Athena运行 MSCK REPAIR TABLE 作业以便从 Athena 查询新分区中的数据。应为在添加物理分区时,目录中的元数据将变得与文件系统中的数据布局不一致,需要将有关新分区的信息添加到目录中,要更新元数据;

具体代码请参照link

可以看到通过部署以上代码到MWAA,通过Airflow Web UI我们的Data Pipeline会定时按小时运行以上三步作业:

5.3 开源Delta Lake不提供Optimize功能,我们需要定时清理合并数据湖上小文件来提高query数据湖的性能。比如以下脚本我们会把小文件合并成10个最终文件,我们我们也可以灵活的选择优化整个表或优化某个分区。但由于Compaction对计算资源和下游操作有一定影响,建议用airflow来定时每月或周执行一次。

#### Compact the small files ####
path = "s3://xxx-poc-glue-wei/raw/"
numFiles = 10

(spark.read
 .format("delta")
 .load(path)
 .repartition(numFiles)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .save(path))


#### Compact a partition ####
path = "s3://xxx-poc-glue-wei/raw/"
partition = "year = '22'"
numFilesPerPartition = 10

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))

5.4 我们还可以运行 Vacuum() 命令来删除旧的数据文件,因此您无需付费存储未压缩的数据。

from delta.tables import *

Basetable = DeltaTable.forPath(sparkSession, BasePath)  # path-based tables,

Basetable.vacuum()       # vacuum files not required by versions older than the default retention period

Basetable.vacuum(100)    # vacuum files not required by versions more than 100 hours old

步骤 6: QuickSight展示

Amazon QuickSight 是一项云规模的商业智能 (BI) 服务, 客户可以连接到各种数据源、设计或修改数据集和设计可视化分析、向同事发送通知以组合分析,以及发布报告和仪表板。在我们的Demo里,我们可以建立一个到Athena上的processed table的Dataset,然后可以快速的生成一些用户订单和会员的信息。

总结

我们在这篇博客里谈及了关于利用Delta Lake和Glue Streaming Job来构建流式数据入湖的很多方面,其中包括如何通过Terraform集成Delta Lake和Glue job, 利用Spark的fair scheduler让Glue Job同时处理两个DAG, 应用Stream-Stream Join的功能关联流式数据,使用Delta Lake的MERGE 命令允许我们实现 UPSERT语义,使用Spark原生trigger.once mode特性实现一次性的流式处。与此同时,也系统的介绍了利用MWAA实现多个glue job的编排和定时触发,以及利用Amazon Athena和Quicksight对delta table里面的数据做交互式查询和可视化处理。

由于篇幅的原因有很多细节我们也无法进行更进一步的探讨,但我们验证了在不直接依赖Hadoop集群或者Databricks的情况下,glue的环境是能够定制化的实现delta lake以及原生spark里面的诸多功能,并且与此同时由于glue的serverless的特性,也使得这个方案天然具备了低管理运维成本的优势。一个完整的事务型数据湖需要具备的功能远不止于这篇博客里面介绍的内容,不过我们确实希望通过本文的介绍,能够让更多的人了解 Glue Streaming Job的功能特性以及对Delta Lake的友好集成,并基于此完善出更多有意思的数据湖架构。

参考资料

https://spark.apache.org/docs/latest/job-scheduling.html

https://www.sigmoid.com/blogs/spark-streaming-internals/

https://thinkwithwp.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

https://www.teradata.com/Blogs/Streaming-Data-Into-Teradata-Vantage-Using-Amazon-Managed-Kafka-MSK-Data-Streams-and-AWS-Glue-Stre

https://docs.databricks.com/delta/delta-intro.html

https://docs.delta.io/latest/presto-integration.html

本篇作者

胡晓度

AWS解决方案架构师,负责跨国企业级客户基于AWS的技术架构设计、咨询和设计优化工作。在加入AWS之前曾就职于电商Farfetch,海外政府IT部门和咨询相关企业,积累了丰富的大数据开发和数据库管理的实践经验。目前主要专注于大数据技术领域研究和AWS云服务在国内和全球的应用和推广。

钟威

AWS解决方案架构师,负责跨国企业级客户基于AWS的技术架构设计、咨询和设计优化工作。在加入AWS之前曾就职于Continental AG和Vitesco Technologies汽车企业,积累了丰富的基础设施搭建和CICD pipeline的实践经验。