亚马逊AWS官方博客

Autel 基于 AWS Security Hub 的 DNS Firewall 威胁事件集中管理解决方案 | Autel’s Centralized Management Solution for DNS Firewall Threat Events Based on AWS Security Hub

中文版本 | English Version


关于 Autel

成立于 2004 年,深圳市道通科技股份有限公司(AUTEL Intelligent Technology Corp., Ltd.)专注于汽车先进诊断、检测和测试系统,智能充电系统及电子元件的研发、生产、销售与服务。道通的产品与服务覆盖美国、欧洲、日本、澳大利亚等 70 多个国家和地区。道通是全球领先的电动汽车智能充电解决方案、汽车诊断和测试产品、胎压监测系统(TPMS)产品、高级驾驶辅助系统(ADAS)产品及相关软件云解决方案提供商之一。道通的核心技术现已应用于电动汽车诊断设备、电动汽车充电设备、充电运营商云平台、太阳能和储能解决方案,致力于高效连接太阳能、储能、充电器和云。道通将继续推动综合能源生态系统的建设,快速扩展电动汽车行业。

背景介绍

道通高度重视用户隐私和安全。为了履行我们对客户安全的承诺,我们已成功获得一系列网络安全和数据隐私认证,例如 SOC 2、CSA-STAR、ISO 27001、ISO 27701、ISO 27018 和 NIST CSF。我们与多个安全机构合作,设计了全面的隐私和安全解决方案。过去,道通通过集成内容安全策略(CSP)和 HTTP 严格传输安全(HSTS)功能来提升客户端安全性,防止注入风险,消除潜在的低层协议,升级安全协议至 TLS1.3,为 SaaS 服务建立强大的前端和高质量的通信安全。此外,我们利用 AWS 的云安全功能构建了一道安全“墙”,包括通过 AWS WAF 和 AWS DNS 防火墙的主动拦截,使用 Amazon GuardDuty 的深入防御,以及利用 AWS Security Hub 进行全面的安全操作。

本次我们将详细分享,我们是如何在 AWS DNS Firewall 威胁事件数据时间将数据流转到标准的 AWS Security Hub。类似教程在市面上并不常见。希望在遇到类似场景时能够节约您的时间。

AWS Security Hub 是一款云安全态势管理服务,它为用户提供了云上安全最佳实践检查的能力, 基于 CIS、PCI DSS 以及 AWS 基础安全最佳实践等多个安全标准自动持续评估用户的 AWS 账户和资源安全状态。此外,Security Hub 也为用户提供了一个集中化的安全告警展示面板,用户可以按照标准的格式将来自多个安全产品、区域及 AWS 账户的安全告警统一汇聚到该服务中并在面板上进行查看和筛选。同时用户也可以在该面板上选中告警并触发后续的自动化响应流程。

AWS Security Hub 已与 AWS 的原生安全服务及常见的第三方安全服务进行了集成,当用户需要将这些安全服务产生的告警收集到 Security Hub 中时只需进行相应的配置即可,无需进行额外的开发工作。但同时,Security Hub 也提供了标准的告警格式以及告警插入、更新 API 来帮助用户将尚未与 Security Hub 集成的安全工具产生的告警插入到其中以进行统一的告警管理和处置。本文以 Amazon Route 53 Resolver DNS Firewall 产生的域名拦截告警和 AWS WAF V2 产生的 Amazon CloudWatch Metrics SQL 数据库攻击告警为例,说明了如何将安全工具产生的告警插入到 Security Hub 之中。

方案介绍

Security Hub 提供了两个 API 来允许安全工具向其插入告警或更新告警:

BatchImportFindings

用户可通过这个 API 向 Security Hub 中插入新的告警、或更新(当告警 ID 已经存在时)一个已有的告警。

BatchUpdateFindings

用于把用户在产生告警的工具中对告警的处置相关信息同步到 Security Hub 中,例如调整告警的严重等级、添加备注、更新工作流状态等。这个 API 不会更改告警的 UpdatedAt 字段。

在调用这两个 API 时,用户需要将告警处理为 Security Hub 的标准告警格式,ASFF(AWS Security Finding Format)。ASFF 规定了一个告警的必选字段以及可选字段,并描述了告警的字段名称及各字段的取值类型和取值范围。

示例 1. 下图是将告警插入到 Security Hub 中的方案架构示意图,其中以 DNS Firewall 服务产生的告警为例。该方案的工作流程如下:

  1. 使用 EventBridge 捕获 DNS Firewall 产生的告警事件。
  2. EventBridge 将捕获的事件发送到 SQS 队列中。
  3. SQS 队列缓存事件直到超过指定的事件数目上限或等待时长,之后 SQS 触发 Lambda 函数批处理缓存的事件。
  4. Lambda 处理从 SQS 发送来的事件列表,将其转化为 ASFF 格式,之后调用 BatchImportFindings API 将格式化后的告警事件插入到 Security Hub 中。

示例 2. 下图是 Amazon CloudWatch 告警插入到 Security Hub 中的方案架构示意图,其中以 AWS WAF V2 产生的 Amazon CloudWatch Metrics 的告警为例。该方案工作流程如下:

  1. 创建 Amazon CloudWatch AWS WAF V2 告警。
  2. 指定告警目标的 Lambda 函数。
  3. Lambda 处理从 Amazon CloudWatch 发送来的告警事件,将其转化为 ASFF 格式,之后调用 BatchImportFindings API 将格式化后的告警事件插入到 Security Hub 中。

前置条件

  • 具有 IAM,Lambda,SQS,EventBridge,CloudWatch Logs,Security Hub 服务管理权限的 IAM 账号用于执行配置操作。
  • AWS 账户的当前区域已开启 Security Hub 服务并配置了 DNS Firewall。

示例 1. 配置步骤

创建 SQS 队列用于缓存告警事件

SQS 控制台页面,创建一个“标准”类型的 SQS 队列用于接收安全工具产生的告警事件,参数如下:

(可选)您可以创建另一个队列并将其指定为前述告警事件队列的死信队列,这样后续无法被正常处理的告警事件将进入死信队列,您可以通过添加对死信队列的监控来及时发现处理失败的告警并进行人工的调查和更正您的告警处置代码中的 Bug。

创建 EventBridge 规则用于捕获 DNS Firewall 告警事件

EventBridge 的规则页面创建一个基于“default”事件总线的“具有事件模式的规则”类型的规则。

在“事件模式”中,录入下列内容用于捕获全部的 DNS Firewall 告警事件。您也可以修改或添加更多过滤条件用于捕获符合您需求的或更加细粒度的告警,例如您可以将 detail-type 字段中的内容改为“DNS Firewall Block“来捕获 DNS Firewall 的拦截事件。

{
  "source": ["aws.route53resolver"],
  "detail-type": ["DNS Firewall Alert"]
}

在选择目标页面,选择“SQS 队列”作为目标类型,并在“队列”下拉框中选择之前创建的 SQS 队列。

创建 Lambda 函数用于格式化告警并插入 Security Hub

Lambda 控制台页面,创建一个新的函数,运行时使用“Python 3.12”。

新建一个 IAM Role 作为 Lambda 的执行角色,该 Role 应具备如下权限(请注意替换其中的账户 ID、区域、Lambda 名称和 SQS 名称为您的实际情况)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": "arn:aws:logs:<region>:<account_id>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:<region>:<account_id>:<SQS_queue_name>"
        },
        {
            "Effect": "Allow",
            "Action": [
                "securityhub:BatchUpdateFindings",
                "securityhub:BatchImportFindings"
            ],
            "Resource": "*"
        }
    ]
}

录入下面的代码。

import json
import boto3
import os
import datetime

DEFAULT_SEVERITY = "HIGH"
DEFAULT_TYPES = ["TTPs/Defense Evasion"]
DEFAULT_TITLE = "DNS Firewall generated an alert"
DEFAULT_PRODUCT_NAME = "DNS Firewall"
DEFAULT_COMPANY_NAME = "Personal"
DEFAULT_DESC = DEFAULT_TITLE
PRODUCT_FIELDS_KEYS = [
    "transport",
    "query-name",
    "query-type",
    "query-class",
    "firewall-rule-action",
    "firewall-rule-group-id",
    "firewall-domain-list-id",
    "src-addr",
    "src-port",
    "vpc-id",
]

# Initialize AWS clients
securityhub = boto3.client("securityhub")
sts = boto3.client("sts")
AWS_REGION = os.environ["AWS_REGION"]
iam_identity = sts.get_caller_identity()
ACCOUNT_ID = iam_identity["Account"]
AWS_PARTITION = iam_identity["Arn"].split(":")[1]
PRODUCT_ARN = (
    "arn:"
    + AWS_PARTITION
    + ":securityhub:"
    + AWS_REGION
    + ":"
    + ACCOUNT_ID
    + ":product/"
    + ACCOUNT_ID
    + "/default"
)

def flatten_dict(dd, separator ='.', prefix =''):
    return { prefix + separator + k if prefix else k : v
             for kk, vv in dd.items()
             for k, v in flatten_dict(vv, separator, kk).items()
             } if isinstance(dd, dict) else { prefix : dd }

# function find_by_key to find a key and its value from a nested dict with a key_name as parameter
def find_by_key(key_name, dict_to_search):
    for key, value in dict_to_search.items():
        if key == key_name:
            return value
        elif isinstance(value, dict):
            result = find_by_key(key_name, value)
            if result is not None:
                return result

def get_resource_id(dict_to_search):
    if isinstance(dict_to_search, dict):
        resouce_id = find_by_key("id", dict_to_search)
        if resouce_id is not None and isinstance(resouce_id, str):
            return resouce_id
    return "dummy-resource-id-" + str(id(dict_to_search))

def lambda_handler(event, context):
    findings = []
    for record in event["Records"]:
        msg = json.loads(record["body"])
        detail = msg["detail"]
        findings.append(
            {
                # required top-level attributes
                "SchemaVersion": "2018-10-08",
                "AwsAccountId": msg["account"],
                "CreatedAt": msg["time"],
                "UpdatedAt": msg["time"],
                "Severity": {"Label": DEFAULT_SEVERITY},
                "Title": DEFAULT_TITLE,
                "Description": DEFAULT_DESC,
                "Types": DEFAULT_TYPES,
                "ProductArn": PRODUCT_ARN,
                "GeneratorId": msg["source"],
                "Id": detail['id'] if 'id' in detail.keys() else msg["id"],
                "Resources": [
                    {
                        "Type": "Other",
                        "Id": get_resource_id(x),
                        "Details": {"Other": flatten_dict(x)}
                    } for x in detail["resources"]
                ],
                # optional top-level attributes
                "CompanyName": DEFAULT_COMPANY_NAME,
                "ProductName": DEFAULT_PRODUCT_NAME,
                "LastObservedAt": datetime.datetime.fromtimestamp(
                    int(detail["last-observed-at"])
                ).strftime("%Y-%m-%dT%H:%M:%SZ"),
                "Region": msg["region"],
                "Action": {
                    "ActionType": "DNS_REQUEST",
                    "DnsRequestAction": {
                        "Blocked": (
                            False if detail["firewall-rule-action"] == "ALERT" else True
                        ),
                        "Domain": detail["query-name"],
                        "Protocol": detail["transport"],
                    },
                },
                "ProductFields": {
                    k: detail[k] for k in PRODUCT_FIELDS_KEYS if k in detail.keys()
                },
            }
        )
    response = securityhub.batch_import_findings(Findings=findings)
    print(response)
    if response["FailedCount"] != 0:
        raise Exception(json.dumps(response))

为 Lambda 函数添加触发器,选择之前创建的 SQS 队列并将批处理大小设置为 100。

测试&验证

在您的 DNS Firewall 规则组中添加一个测试用规则,并在其中指定一些测试用的DNS记录。若您没有自己的测试域名,您可以添加由 AWS 提供的测试域名 controldomain1.aggregatelist.firewall.route53resolver.us-east-1.amazonaws.com,该域名将被解析到 1.2.3.4。

在关联上述 DNS Firewall 规则组的 VPC 内,发起域名解析请求,例如在一台 Linux EC2 上执行 dig controldomain1.aggregatelist.firewall.route53resolver.us-east-1.amazonaws.com。

等待几分钟后,在 Security Hub 的告警页面,您应能看到来自 DNS Firewall 的告警,并且勾选该告警后可以在详情页面看到具体的域名及执行该 DNS 查询的资源的名称。

示例 2. 配置步骤

创建 Lambda 函数用于格式化告警并插入 Security Hub

Lambda 控制台页面,创建一个新的函数,运行时使用“Python 3.12”。并在 lambda_function.py 中录入以下代码。

import json
import boto3
import os
import datetime

# Defaults for alarm integration
DEFAULT_SEVERITY = "HIGH"
DEFAULT_TYPES = ["Software and Configuration Checks/Industry and Regulatory Standards"]
DEFAULT_TITLE = "CloudWatch Alarm triggered"
DEFAULT_PRODUCT_NAME = "AWS CloudWatch"
DEFAULT_COMPANY_NAME = "Personal"
DEFAULT_DESC = "CloudWatch Alarm has triggered an alert due to threshold breach."

# Initialize AWS clients
securityhub = boto3.client("securityhub")
sts = boto3.client("sts")

# Constants for AWS
AWS_REGION = os.environ["AWS_REGION"]
iam_identity = sts.get_caller_identity()
ACCOUNT_ID = iam_identity["Account"]
AWS_PARTITION = iam_identity["Arn"].split(":")[1]
PRODUCT_ARN = (
    "arn:"
    + AWS_PARTITION
    + ":securityhub:"
    + AWS_REGION
    + ":"
    + ACCOUNT_ID
    + ":product/"
    + ACCOUNT_ID
    + "/default"
)

# Helper function to convert JSON string to dict
def json_loads_safe(json_string):
    try:
        return json.loads(json_string)
    except ValueError:
        return {}

# Function to handle the lambda event
def lambda_handler(event, context):
    # TODO implement    
    alarm_data = event.get('alarmData', {})
    alarm_state = alarm_data.get('state', {})
    previous_state = alarm_data.get('previousState', {})
    alarm_config = alarm_data.get('configuration', {})
    finding = {
        "SchemaVersion": "2018-10-08",
        "AwsAccountId": event.get("accountId", ""),
        "CreatedAt": event.get("time"),
        "UpdatedAt": event.get("time"),
        "Severity": {"Label": DEFAULT_SEVERITY},
        "Title": DEFAULT_TITLE,
        "Description": DEFAULT_DESC,
        "Types": DEFAULT_TYPES,
        "ProductArn": PRODUCT_ARN,
        "GeneratorId": event.get("alarmArn"),
        "Id": alarm_data.get('alarmName', '') + event.get("time"),
        "Resources": [
            {
                "Type": "AwsCloudWatchAlarm",
                "Id": event.get("alarmArn"),
                "Details": {
                    "Other": {
                        "AlarmName": alarm_data.get('alarmName', ''),
                        "State": alarm_state.get('value', ''),
                        "Reason": alarm_state.get('reason', ''),
                        "ReasonData": json.dumps(json_loads_safe(alarm_state.get('reasonData', '{}'))),
                        "Configuration": json.dumps(alarm_data.get('configuration', {}))  # Serialized configuration
                        # Note that the value of "Configuration" should NOT be longer than 1024 characters
                    }
                }
            }
        ],
        "Region": event.get("region"),
        "CompanyName": DEFAULT_COMPANY_NAME,
        "ProductName": DEFAULT_PRODUCT_NAME,
        "LastObservedAt": datetime.datetime.strptime(alarm_state.get('timestamp', ''), '%Y-%m-%dT%H:%M:%S.%f%z').strftime('%Y-%m-%dT%H:%M:%SZ')
    }
    
    response = securityhub.batch_import_findings(Findings=[finding])
    print(response)
    if response["FailedCount"] > 0:
        raise Exception(json.dumps(response))
        
    return {
        'statusCode': 200,
        'body': json.dumps('Finished.')
    }

配置 > 权限中找到 Lambda 的执行角色 IAM Role,该 Role 应具备如下权限(请注意替换其中的账户 ID、区域、Lambda 名称为您的实际情况)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": "arn:aws:logs:<region>:<account_id>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "securityhub:BatchImportFindings"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

配置 > 权限中找到 Lambda 基于资源的政策声明,增加 CloudWatch alarm 触发 lambda function 的权限。其中,Statement ID: 自定义如「cw-lambda-access」, principal: lambda.alarms.cloudwatch.amazonaws.com, Action: lambda: InvokeFunction(请注意替换其中的账户 ID、区域、Lambda 名称为您的实际情况)。

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Sid": "cw-lambda-access",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.alarms.cloudwatch.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:<region>:<account_id>:function:<lambda_function>"
    }
  ]
}

创建测试样例 Web ACL

WAF & Shield 的控制台页面,点击导航栏 Web ACLs > Create web ACL 创建样例 。

自定义名称「test-web-acl」,设置为 Regional ACL(勾选Regional resources),并选择 region。Global 的 Web ACL 会被部署到亚马逊云科技全球 400 多个 PoP 节点,使用一致的 WAF 防护策略与 CloudFront 结合来保护用户的源站。Region 的 Web ACL 在每个 Region 配置不同的 Web ACL,用于保护同 Region 里的 Web 服务器。

Add rules and rule groups > Add rules > Add managed rule groups,选择添加 AWS managed rule groups > SQL database 规则测试 SQL injection。添加结果如下。其余配置默认即可。

创建一个测试用的 Web 服务器和 ALB

直接用 WAF 保护生产环境里的 Web 服务器需要特别注意出现误杀的情况。因此这里我们先创建一个简单的测试用网页服务器。这里我们选用了一个 echo-server 的 Docker 镜像,创建一个 httpecho 的容器让它能够把我们通过 http 访问服务器的请求头都提取出来作为 http 响应给我们的浏览器。

首先启用一个 t2.micro 的 EC2,使用 Amazon Linux2 AMI 启动。在此 EC2 上安装 Docker,参考 https://docs.thinkwithwp.com/zh_cn/AmazonECS/latest/developerguide/docker-basics.html

具体 Linux 命令如下:

安装 docker

sudo yum update -y
sudo amazon-linux-extras install docker

赋予 ec2-user 用户 docker 启动的 linux 权限

sudo usermod -a -G docker ec2-user

退出当前 ssh session 以使权限生效

logout

使用 ec2-user 再次 login 检查 docker 信息

sudo service docker start
docker info | grep Ver

输出如下即为 docker 安装成功

Server Version: 20.10.7
Cgroup Version: 1
Kernel Version: 4.14.47-64.38.amzn2.x86_64

运行 echo-server docker image,监听 TCP 1028 端口

docker run -d --name httpecho  -p 1028:8080 jmalloc/echo-server httpecho

curl 检查本地服务

curl 127.0.0.1:1028

输出如下即表示服务正常。该输出表示 echo-server 服务器收到 curl 送来的 http 请求,内容非常简单

Request served by 16e51706efbe

HTTP/1.1 GET /

Host: 127.0.0.1:1028
User-Agent: curl/7.79.1
Accept: /

创建 ALB 「httpechoalb」监听 TCP 80 端口,后端转发至 EC2 的 TCP 1028 端口。使用浏览器可以正常访问 alb 域名得到 http 访问服务器的请求头信息。

把创建的 Web ACL – test-web-acl 与的 ALB 关联起来。点击创建的样例 web acl 「test-web-acl」> Associated AWS resources,添加创建的 ALB 资源「httpechoalb」。

创建测试 WAF alarm 触发 Lambda 函数

在 CloudWatch 控制台界面,全部指标 >> WAFV2 > ManagedRuleGroup, Region, WebACL > 选择 ManagedRuleGroup > AWSManagedRulesSQLiRuleSet > 创建告警。

指定指标和条件 >> 指标 – 指标名称:「BlockedRequests」, WebACL: test-web-acl, Statistic: 总计, Period: 10 秒。条件 – 阈值类型:静态, 每当 BlockedRequests 为:>= 1。

Lambda 操作: 告警中 > 从登录账户中选择 Lambda 函数 > 创建的 Lambda 函数「cw-lambda-access」

测试与验证

运行 curl —header 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36 Edg/97.0.1072.69' -X POST "xxxx.us-east-1.elb.amazonaws.com" -F "user='AND 1=1;",其中"xxxx.<region>.amazonaws.com"为创建的「httpechoalb」ALB 域名。注意,这里展示的 web ACL、CloudWatch alarm 等配置均为测试样例,您可以根据自身需求调整。

等待几分钟后,在 Security Hub 的告警页面,您应能看到来自 CloudWatch 的告警,并且勾选该告警后可以在详情页面看到具体信息。

总结

虽然 Security Hub 已经与许多 AWS 的安全服务和第三方安全服务进行了集成,以方便地收集并汇聚这些安全服务产生的告警事件,但当用户使用的安全服务尚未与 Security Hub 集成时,用户仍需自行编写代码完成相关的集成工作。本文介绍了一种典型的集成架构,并用 DNS Firewall 产生的告警为例,展示了如何将安全告警插入到 Security Hub 之中进行集中展示。

Autel 与 AWS 携手,通过 AWS Security Hub 构建了 DNS Firewall 威胁事件的集中管理方案,及多种安全措施,每天帮助 Autel SaaS 云客户自动拦截超过 10,000 次各种类型的攻击和骚扰,这一方案展示了 Autel 在云安全技术中的创新能力,实现了为客户隐私和安全保驾护航的能力。

参考链接

https://thinkwithwp.com/cn/blogs/china/aws-waf-deployment-guide-1-waf-principle-default-deployment-and-log-storage/

本篇作者

任恺蒂

Autel 网络安全高级工程师,曾任 2024 年亚马逊云科技中国峰会演讲者,广泛涉猎于 IoT 安全/云安全等综合领域。

王旭东

亚马逊云科技安全产品解决方案架构师,负责帮助客户进行安全解决方案的架构设计。在加入亚马逊云科技之前,曾在互联网 SaaS 企业负责公司基础架构安全建设及治理。

Cathy Huang

亚马逊云科技助理解决方案架构师,主要负责 Auto 行业客户解决方案的设计和实现,专注于云端 AI 技术的应用与实践。

郭强

亚马逊云科技资深解决方案架构师。曾就职 IBM,有两次 ToB 创业经历,在企业级应用架构和 B2B 移动互联网解决方案方面积累了丰富实战经验。专注于企业数字化转型,擅长运用云计算、AI 和 IoT 技术解决复杂业务挑战。目前致力于 AIoT 和先进 AI 应用在汽车行业的研究与实践,推动创新技术在汽车及相关领域的广泛应用。