亚马逊AWS官方博客

基于 Amazon MWAA 和 Amazon Batch,实现载荷仿真任务弹性调度和计算

1. 背景说明

风电机组载荷计算是风电机组设计的第一步,得出的载荷结果用作机组结构部件强度分析的输入,因此载荷计算结果的可靠性对风电机组的设计至关重要。

载荷计算有前处理和后处理两个阶段,前处理阶段通过 Bladed 软件按照设置的输入值计算出中间结果,后处理则根据前处理中间计算得出最终报告。

由于没有生产数据,本篇文章会从实现思路上进行说明,但在总结章节会提供真实场景下的测试结果。

2. 方案和服务介绍

2.1 方案介绍

  1. Amazon Managed Apache Airflow(Amazon MWAA)调用 Amazon Batch 启动 Amazon EC2 实例并通过 UserData 注册到 Amazon Elastic Container Service Cluster(Amazon ECS),借助 Amazon ECS 调度和运行前处理任务。
  2. 前处理计算结果保存到 Amazon Simple Storage Service(Amazon S3)—>由于是模拟 Bladed 运行,并不会产生计算结果文件,因此不涉及这部分实现。
  3. Amazon MWAA 调用 Amazon Lambda 创建 Amazon FSx for Lustre(Amazon Lustre)、映射 Amazon S3 中的前处理结果目录并等待创建完成。
  4. Amazon MWAA 调用 Amazon Batch 启动 Amazon EC2 实例并通过 UserData 注册到 Amazon ECS、挂载 Amazon Lustre,借助 Amazon ECS 调度和运行后处理任务—>由于是模拟载荷仿真后处理任务,因此只会通过 Lasyloading 方式验证可以访问 Amazon S3 中的文件,并不会真正进行计算。
  5. Amazon MWAA 调用 Amazon Lambda 删除Amazon Lustre。

2.2 服务介绍

  • Amazon MWAA:Amazon MWAA 是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。
  • Amazon Batch:Amazon Batch 是一项完全托管的批处理计算服务,可在 Amazon ECS、Amazon EKS、Amazon Fargate 以及竞价型或按需型实例等所有 Amazon Web Services 计算产品中规划、安排和运行您的容器化批处理或 ML 工作负载。
  • Amazon ECS:Amazon ECS 是一项完全托管的容器编排服务,可帮助您更有效地部署、管理和扩展容器化的应用程序。它与 Amazon Web Services 环境深度集成,提供易于使用的解决方案,用于在云端和本地运行容器工作负载,并通过 Amazon ECS Anywhere 提供高级安全功能。
  • Amazon Lambda:Amazon Lambda 是一项计算服务,可以运行您的代码以响应事件并自动管理计算资源,这使其成为将想法转化为现代生产无服务器应用程序的最快方式。
  • Amazon FSx for Lustre:Amazon FSx for Lustre 提供完全托管式共享存储,兼具常用 Lustre 文件系统的可扩展性和性能。
  • Amazon S3:Amazon S3 是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。各种规模和行业的客户可以为几乎任何使用案例存储和保护任意数量的数据,例如数据湖、云原生应用程序和移动应用程序。借助高成本效益的存储类和易于使用的管理功能,您可以优化成本、组织数据并配置精细调整过的访问控制,从而满足特定的业务、组织和合规性要求。
  • Amazon EC2:Amazon EC2 提供最广泛、最深入的计算平台,拥有超过 750 个实例,并可选择最新的处理器、存储、网络、操作系统和购买模型,以帮助您最好地满足工作负载的需求。

3. 环境搭建

3.1 创建 Amazon MWAA—>请参考 References 1

需要指定 DAGs folder 以及 Execution role,Execution role 需要具有调用 Amazon Lambda、Amazon Batch 和访问 Amazon S3 的权限。

Airflow 版本是 2.7.2,Providers 版本如下图所示:

3.2 制作 Custom AMI

通过 Amazon Batch 调度的前处理任务需要同 Bladed License Server 通信,因此需要 aksusbd 服务和 hasplm.ini 文件,其中 aksusbd 服务会监听 1947 端口。

但是 Amazon Batch 的 NetworkMode 是 Host 模式,若在第一个前处理任务中启动 aksusbd 服务,则会在 Host 主机上监听 1947 端口,而随后启动的前处理任务则由于端口占用而无法启动服务,只会借助第一个前处理任务启动的端口同 Bladed License Server 通信。

这时若第一个前处理任务结束终止,会导致其他前处理任务由于没有可用 1947 端口,而出现任务失败。

因此在 Host 上启动 aksusbd 服务和配置 hasplm.ini 是更好的选择,所以需要制作 Custom AMI。

  1. 创建一台 Amazon EC2,使用 amzn2-ami-ecs-hvm-2.0.20231219-x86_64-ebs 镜像
  2. 登录新创建的 Amazon EC2 并安装 aksusbd 服务和配置 ini
    1. aksusbd服务会监听1947端口并和Bladed Cloud License Server通信
    ~]$ wget https://blog-bucket-for-cloudformation.s3.ap-northeast-2.amazonaws.com/Sentinel_LDK_RedHat_and_SuSE_RPM_Run-time_Installer.tar.gz
    ~]$ cd Sentinel_LDK_RedHat_and_SuSE_RPM_Run-time_Installer
    ~]$ sudo yum install aksusbd-9.15-1.x86_64.rpm -y
    ~]$ sudo systemctl start aksusbd; sudo systemctl enable aksusbd
    
    2. hasplm.ini中修改Bladed Cloud License Server的地址
    ~]$ wget https://blog-bucket-for-cloudformation.s3.ap-northeast-2.amazonaws.com/hasplm.ini--->修改serveraddr
    
  3. 制作 Custom AMI—>请请参考 References 2—>记录 AMI ID     Amazon Batch 使用这个 AMI 启动 Amazon EC2 承载 Amazon ECS 调度任务。
  4. 删除 Amazon EC2

3.3 通过 Amazon Batch Wizard 模式创建前处理 Amazon Batch 环境—>请参考 References 3

这里有几点需要注意的选项:

  • Compute environment configuration—>Instance role—>S3 Read/Write Permission
  • Instance configuration—>Use EC2 Spot instances
    • 前处理任务运行时间都在 1 小时内且可通过 Airflow 设置 Retry,因此通过 Spot 实例节约成本
    • 同时,可以混合使用 Spot 实例和 On-demand 实例,让 On-demand 实例作为 Spot 实例的 Fallback—>请参考 References 6
  • Instance configuration—>Allowed instance types—>c6i.4xlarge
    • 前处理任务是计算密集型,因此选择 c6i
  • Additional configuration—>Allocation strategy—>SPOT_CAPACITY_OPTIMIZED
    • 可以有效降低中断概率—>请参考 References 7
  • Additional configuration—>EC2 configuration—>Custom AMI ID
  • Container configuration—>Image: public.ecr.aws/u1t4n1f6/ubuntu-bladed:latest
    • 优化镜像下载速度—>请参考 References 8
  • Container configuration—>Command: [“/bin/bash”,”-c”,”cd /usr/local/dtbladedlinux-4.13.0.8/DemoData/GL_Jacket_Onshore/; sleep 300; dtbladed-4.13.0.8″]

3.4 编写调度前处理任务的 Airflow Dag,并上传到 DAGs folder

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator # 不同版本的apache-airflow-providers-amazon的import路径会不同
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# 设置 DAG 的默认参数
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 19),
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

# 创建 DAG
dag = DAG(
    'china_bladed_process_dag', # DAG名字,唯一
    default_args=default_args,
)

start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)

JOB_OVERRIDES = {} # 参考SDK说明--->https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch/client/,可以Override默认行为,比如containerOverrides中的command可以覆盖原有Job Definition中定义的命令

for i in range(30): # 30并发,可调整
    submit_batch_job = BatchOperator(
        task_id='submit_batch_job_'+str(i),
        job_name='batch_job_name_'+str(i),
        job_queue='airflow-bladed-getting-started-wizard-job-queue', # 第3步创建的batch queue
        job_definition='airflow-bladed-getting-started-wizard-job-definition:2', # 第3步创建的batch definition
        overrides=JOB_OVERRIDES,
        dag=dag,
    )

    start_task >> submit_batch_job >> end_task

3.5 通过 Amazon Batch Wizard 模式创建后处理 Amazon Batch 环境—>请参考References 3—>Managed compute env 会自动创建 Launch template—>点进 Launch template,查看 Template tags—>确定创建的 Launch template,记录名字

这里有几点需要注意的选项:

  • Instance configuration—>Use EC2 Spot instances
    • 后处理任务运行时间都是分钟级别且可通过 Airflow 设置 Retry,因此通过 Spot 实例节约成本
    • 同时,可以混合使用 Spot 实例和 On-demand 实例,让 On-demand 实例作为 Spot 实例的 Fallback—>请参考 References 6
  • Instance configuration—>Allowed instance types—>r6i.large
    • 后处理任务是内存密集型,因此选择 r6i.large
  • Additional configuration—>Allocation strategy—>SPOT_CAPACITY_OPTIMIZED
    • 可以有效降低中断概率—>请参考 References 7
  • Container configuration—>Image: default image
  • Container configuration—>Command: [“ls”,”/scratch”]

3.6 基于步骤 3.5 创建的 Launch template,创建新的 Launch template,修改 Compute env 使用这个 Launch template

1. 基于步骤 3.5 创建的 Launch template,创建新的 Launch template—>请参考 References 4

这里有几点需要注意的选项:

  • Launch template name and description—>Launch template name: 步骤 3.5 创建的 Launch template
  • Storage (volumes)—>EBS Volumes—>Volume 1—>Volume type: gp3

2. 修改 Compute env 使用这个 Launch template—>

  • Amazon Batch—>Compute environments—>上一步创建的 Compute environment—>Edit—>Instance configuration—>Additional configuration

3.7 编写 Amazon Lambda 2 函数,Execution role 需要具有删除 Amazon Lustre 的权限,并且修改 Timeout 时间为 1 分钟。同时,创建一个环境变量 filesystem_id: 1,这个值会在每次创建 Amazon Lustre 时动态修改

import boto3
import os

client = boto3.client('fsx', region_name="cn-norhtwest-1")

def lambda_handler(event, context):
    # 得到参数值
    filesystem_id = os.environ.get("filesystem_id")
    
    # 删除Amazon FSx for Lustre
    response = client.delete_file_system(FileSystemId=filesystem_id)

3.8 编写 Amazon Lambda 1 函数,Execution role 需要具有创建 Amazon Lustre 文件系统、修改 Launch template 和 Lambda Environment 的权限,并且修改 Timeout 时间为 1 分钟

import boto3
import time
import base64

# boto3初始化
client = boto3.client('fsx', region_name="cn-northwest-1")
client_lambda = boto3.client('lambda', region_name="cn-northwest-1")
client_ec2 = boto3.client('ec2', region_name="cn-northwest-1")
client_batch = boto3.client('batch', region_name="cn-northwest-1")

def lambda_handler(event, context):
    # 创建Amazon FSx for Lustre
    response = client.create_file_system(
        FileSystemType='LUSTRE',
        SecurityGroupIds=[
            'sg-08653e515b27628ac', # 需要修改,参考Amazon Batch服务compute environment中的Networking部分
        ],
        SubnetIds=[
            'subnet-099097bfc300e9550', # 需要修改,参考Amazon Batch服务compute environment中的Networking部分
        ],
        StorageCapacity=1200, # 按照需求修改
        LustreConfiguration={
            'DeploymentType': 'SCRATCH_2',
       # 1. 通过指定ImportPath实现Amazon Lustre和Amazon S3资源映射,通过Lazyloading方式进行访问;通过指定AutoImportPolicy实现Amazon Lustre和Amazon S3文件同步机制,如果未指定,则默认是NONE,即只在文件系统创建时进行同步--->请参考References 9
       # 2. 除了Lazyloading,Amazon Lustre还支持Preloading,提前加载Amazon S3中的资源到Lustre--->请参考 References 10
            'ImportPath': 's3://airflow-bladed/dags', # 模拟前处理任务结果存放目录
        }
    )
    
    filesystem_id = response['FileSystem']['FileSystemId']
    mount_name = response['FileSystem']['LustreConfiguration']['MountName']
    
    # 修改Amazon Lambda函数(delete_lustre)的filesystem_id环境变量
    response_lambda = client_lambda.update_function_configuration(
        FunctionName='delete_lustre', # 需要修改,删除Amazon Lustre文件系统的Amazon Lambda函数名字
        Environment={
            'Variables': {
                'filesystem_id': filesystem_id
            }
        }
    )
    
    # 更新后处理Launch template userData,先赋值若干参数
    # ----------从这里开始----------
    launch_template_name = 'postprocess-launch-template' # 步骤3.6创建的Launch template名字
    region='cn-northwest-1'
    fsx_directory='/scratch'
    file_system_id_01=filesystem_id
    fsx_mount_name=mount_name
    # ----------到这里结束----------
    # 编写UserData
    user_data_script = f"""MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==BOUNDARY==" 

--==BOUNDARY==
MIME-Version: 1.0
Content-Type: text/x-shellscript; charset="us-ascii"

#!/bin/bash
amazon-linux-extras install -y lustre2.10
mkdir -p {fsx_directory}
mount -t lustre -o noatime,flock {file_system_id_01}.fsx.{region}.amazonaws.com@tcp:/{fsx_mount_name} {fsx_directory}

--==BOUNDARY==--
    """
    # Base64-encoded UserData
    encoded_user_data = base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8')
    
    # 创建新版本的Launch Template
    response_create_lt_version = client_ec2.create_launch_template_version(
        LaunchTemplateName=launch_template_name,
        SourceVersion="1", # 参考版本
        LaunchTemplateData={'UserData': encoded_user_data}
    )

    version_number = response_create_lt_version['LaunchTemplateVersion']['VersionNumber'] # 提取Launch template version number
    
    response_batch_update_compute_env = client_batch.update_compute_environment(
        computeEnvironment='postprocess-airflow-bladed-getting-started-wizard-compute-env', # 步骤3.5创建的Compute environment名字
        computeResources={
            'launchTemplate': {
                'launchTemplateName': launch_template_name,
                'version': str(version_number)
            }
        }
    )

3.9 完善 Airflow Dag,并上传到 DAGs folder—>在步骤 4 的 Dag 基础上添加如下代码,以自动创建 Amazon Lustre 和删除 Amazon Lustre

# 定义变量
lambda_function_name_create_lustre = 'create_lustre' # 步骤3.8创建的Amazon Lambda函数名字
lambda_function_name_delete_lustre = 'delete_lustre' # 步骤3.7创建的Amazon Lambda函数名字
payload = '{"key1": "value1", "key2": "value2"}'

# 定义Amazon Lambda Operator
lambda_task_create_lustre = LambdaInvokeFunctionOperator(
    task_id='invoke_lambda_task_create_lustre',
    function_name=lambda_function_name_create_lustre,
    invocation_type='RequestResponse',
    payload=payload,
    dag=dag,
)

lambda_task_delete_lustre = LambdaInvokeFunctionOperator(
    task_id='invoke_lambda_task_delete_lustre',
    function_name=lambda_function_name_delete_lustre,
    invocation_type='RequestResponse',
    payload=payload,
    dag=dag,
)

# 定义Bash Operator,等待Amazon Lustre创建完成
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='sleep 600',
    dag=dag,
)

for i in range(30):
    submit_batch_job = BatchOperator(
        task_id='submit_batch_job_'+str(i),
        job_name='batch_job_name_'+str(i),
        job_queue='airflow-bladed-getting-started-wizard-job-queue',
        job_definition='airflow-bladed-getting-started-wizard-job-definition:2',
        overrides=JOB_OVERRIDES,
        dag=dag,
    )

    start_task >> submit_batch_job >> lambda_task_create_lustre >> bash_task >> lambda_task_delete_lustre >> end_task

3.10 修改步骤 3.5 创建的 Amazon Batch job definition,使得任务可挂载 Amazon FSx Lustre—>请参考 References 5

3.11 完善 Airflow Dag,并上传到 DAGs folder—>在步骤 9 的 Dag 基础上添加如下代码,以运行后处理任务

# 定义后处理BatchOperator
submit_batch_job_postprocess = BatchOperator(
    task_id='submit_batch_job_postprocess',
    job_name='batch_job_name_postprocess',
    job_queue='postprocess-airflow-bladed-getting-started-wizard-job-queue', # 步骤3.5创建的Job queue
    job_definition='postprocess-airflow-bladed-getting-started-wizard-job-definition:2', # 步骤3.5创建的Job definition
    overrides=JOB_OVERRIDES,
    dag=dag,
)

for i in range(30):
    # 定义前处理BatchOperator
    submit_batch_job = BatchOperator(
        task_id='submit_batch_job_'+str(i),
        job_name='batch_job_name_'+str(i),
        job_queue='airflow-bladed-getting-started-wizard-job-queue',
        job_definition='airflow-bladed-getting-started-wizard-job-definition:2',
        overrides=JOB_OVERRIDES,
        dag=dag,
    )

start_task >> submit_batch_job >> lambda_task_create_lustre >> bash_task >> submit_batch_job_postprocess >> lambda_task_delete_lustre >> end_task

4. 结果验证

4.1 下载完整版 Airflow Dag

For-Blog/china_bladed_process.py at main · successzy/For-Blog (github.com)

4.2 前处理并发验证—>30 并发

4.3 前处理结果验证—>Bladed 计算结果

4.4 后处理结果验证—>ls /scratch

4.5 整体流程验证—>全部成功

5. 总结

本篇文章介绍了怎样基于 Amazon MWAA 和 Amazon Batch 实现载荷仿真任务弹性调度和计算。真实场景中,这种方式取得了很好的效果。

1. 前处理任务

本地生产集群是拥有 500 个物理核的 Kubernetes 集群,通过 KubernetesPodOperator 进行调度。本方案使用 c6i.4xlarge、Spot 机器,通过 BatchOperator 进行调度。

运行 1441 个前处理工况任务,按照 500 任务并发,每个任务 1 核 2G 执行。

  • 本地生产集群(物理核):1 小时 27 分钟
  • 本方案(超线程):1 小时 10 分钟

2. 后处理任务

本方案使用 r6i.4xlarge、Spot 机器,通过 BatchOperator 进行调度。

运行 246 个后处理任务,按照 500 任务并发,每个任务 1 核 10G 执行。

  • Amazon Lustre 从 Amazon S3 预热 92GB 数据需要 20 分钟,运行后处理任务需要 10 分钟。

6. 环境清理

依次在控制台删除创建的 Amazon MWAA、Custom AMI、前处理 Amazon Batch 环境、后处理 Amazon Batch 环境、Launch template、Amazon Lambda 1、Amazon Lambda 2、Amazon S3 桶。

7. 参考链接

本篇作者

郑毅

西云数据解决方案架构师,曾就职于外企、互联网企业和央企,擅长系统交付、运维和解决方案设计,对于传统 IT 技术以及云计算技术有深入了解和丰富的实战经验。

刘兵

亚马逊云科技高性能计算专业解决方案架构师,专注于协助客户在亚马逊云科技上更经济、可持续地构建高性能计算解决方案。在系统编程、故障排除和调试,以及可观察性方面拥有丰富的经验,使客户能够在全球范围内跨多种架构构建和迁移应用程序至云端。

王志达

亚马逊云科技解决方案架构师,主要负责基础架构如计算、存储的云端设计、改造和优化方案。有多年存储、容器平台和 DevOps 运维经验。

刘忠凯

运达能源集团-技术中心-数字化技术室-高级工程师,负责运达能源集团风电场评估设计系统的研发工作,数字化载荷仿真平台研发工作,风资源公共服务云平台研发工作。专注于风电领域微观、宏观选址,风机载荷仿真的软件系统研发设计工作,为构建准确、高效、稳定、高性能的风电领域设计软件提供架构实现方案。