亚马逊AWS官方博客

Autel’s Centralized Management Solution for DNS Firewall Threat Events Based on AWS Security Hub | Autel 基于 AWS Security Hub 的 DNS Firewall 威胁事件集中管理解决方案

English Version | 中文版本


About Autel

Founded in 2004, AUTEL Intelligent Technology Corp., Ltd. specializes in the R&D, production, sales, and service of automotive advanced diagnostic, detection, and testing systems, smart charging systems, and electronic components. Autel products and services reach more than 70 countries and regions, including the United States, Europe, Japan, and Australia. Autel is one of the world’s leading providers of electric vehicle intelligent charging solutions, automotive diagnostics and testing products, TPMS products, ADAS products, as well as related software cloud solutions. Autel’s core technologies are now being implemented in electric vehicle diagnostic devices, electric vehicle charging units, charge point operator cloud platform, solar energy, and energy storage solutions, aiming to efficiently link solar, storage, charger, and cloud. Autel will continue to pursue an integrated energy ecosystem to rapidly expand the electric vehicle industry.

Background

Autel places great emphasis on user privacy and security. To fulfill our commitment to customer security, we have successfully achieved a series of cybersecurity and data privacy certifications, such as SOC 2, CSA-STAR, ISO 27001, ISO 27701, ISO 27018, and NIST CSF. We have partnered with multiple security organizations to design a comprehensive privacy and security solution. We are pleased to share some of Autel’s collaborative security practices with AWS. In the past, Autel has enhanced client-side security by incorporating CSP and HSTS features to prevent injections, eliminated any potential lower-level protocols, upgraded security protocols to TLS1.3, and established robust front-end security and high-quality communication security for its SaaS services. Furthermore, we have built a security “wall” leveraging AWS cloud security features, including proactive interception through AWS WAF and AWS DNS Firewall, in-depth defense with Amazon GuardDuty, and comprehensive security operations using AWS Security Hub.

This blog will elaborate on how we channel DNS Firewall threat event data to the standard AWS Security Hub. Tutorials for this specific scenario are rare in the market, and we hope this guide saves your time in similar situations.

AWS Security Hub is a cloud security posture management service that provides users with the capability to perform security best practice checks on the cloud. It continuously evaluates the security state of AWS accounts and resources based on multiple security standards, including CIS, PCI DSS, and AWS foundational security best practices. Security Hub also provides a centralized security alert dashboard, allowing users to collect security alerts from multiple security products, regions, and AWS accounts into a unified display format. Users can view and filter these alerts on the dashboard and trigger automated response processes as needed.

AWS Security Hub is integrated with AWS’s native security services and common third-party security services. When users need to collect alerts generated by these security services into Security Hub, they can configure this integration with ease, without any additional development effort. At the same time, Security Hub offers a standard alert format and APIs for inserting and updating alerts, allowing users to input alerts generated by security tools that are not yet integrated into Security Hub, facilitating unified alert management and handling. This article illustrates how to insert alerts generated by security tools into AWS security hub, taking examples of domain name interception alerts from Amazon Route 53 Resolver DNS Firewall and SQL database attack alarm from Amazon CloudWatch Metrics for AWS WAF V2.

Solution Description

Security Hub provides two APIs for inserting or updating alerts:

BatchImportFindings

This API enables users to insert new alerts into Security Hub or update existing alerts when the alert ID already exists.

BatchUpdateFindings

This API is used to synchronize alert handling information, such as severity adjustments, adding comments, or updating workflow statuses, from the alerting tool to Security Hub. This API does not change the alert’s UpdatedAt field.

When calling these APIs, users need to transform alerts into the Security Hub standard alert format, AWS Security Finding Format (ASFF). ASFF specifies required ASFF attributes and optional ASFF attributes for alerts, including attribute names, data types, and value ranges.

Example 1: Architecture for Inserting Alerts into Security Hub
The diagram below shows the architecture for inserting alerts into Security Hub using DNS Firewall alerts as an example. The workflow is as follows:

  1. Use EventBridge to capture DNS Firewall alert events.
  2. EventBridge sends captured events to an SQS queue.
  3. The SQS queue buffers events until it reaches a specified threshold or timeout, then triggers a Lambda function to batch-process buffered events.
  4. The Lambda function processes the events from SQS, transforms them into ASFF format, and calls the BatchImportFindings API to insert the formatted alerts into Security Hub.

Example 2: Architecture for Inserting CloudWatch Alarms into Security Hub
This diagram illustrates the process of inserting CloudWatch Metrics alerts generated by AWS WAF V2 into Security Hub. The workflow is as follows:

  1. Create an Amazon CloudWatch AWS WAF V2 alarm.
  2. Specify a Lambda function as the alarm target.
  3. The Lambda function processes CloudWatch alert events, transforms them into ASFF format, and calls the BatchImportFindings API to insert the formatted alerts into Security Hub.

Prerequisites

An IAM account with permissions for managing IAM, Lambda, SQS, EventBridge, CloudWatch Logs, and Security Hub services for configuration operations.

Security Hub service must be enabled, and DNS Firewall must be configured in the current AWS account region.

Example 1 Configuration Steps

Create an SQS Queue for Buffering Alert Events

In the SQS console, create a “Standard” type SQS queue to receive alert events generated by security tools with the following parameters:

(Optional) Create another queue and designate it as a dead-letter queue for the alert event queue mentioned above. This allows unprocessable alerts to be redirected to the dead-letter queue, and you can configure alarms for dead-letter queues to deal with failed alerts in time and manually review and correct the bugs in the alert handling codes.

Creating an EventBridge Rule to Capture DNS Firewall Alert Events

In the EventBridge Rule console, create a new rule based on the “default” event bus using the “Rule with an event pattern” option.

In the “Event pattern” field, enter the following content to capture all DNS Firewall alert events. You can modify or add more filtering criteria to capture alerts that match your specific needs or for more granular alerts. For example, you can change the detail-type field content to “DNS Firewall Block” to capture DNS Firewall blocking events.

{
  "source": ["aws.route53resolver"],
  "detail-type": ["DNS Firewall Alert"]
}

On the “Select Targets” page, choose “SQS Queue” as the target type and select the previously created SQS queue from the dropdown menu.

Creating a Lambda Function to Format Alerts and Insert Them into Security Hub

In the Lambda console, create a new function with the runtime set to “Python 3.12”.

Create a new IAM Role for the Lambda function’s execution role, which should have the following permissions (be sure to replace the account ID, region, Lambda function name, and SQS name with your actual values).

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": "arn:aws:logs:<region>:<account_id>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:<region>:<account_id>:<SQS_queue_name>"
        },
        {
            "Effect": "Allow",
            "Action": [
                "securityhub:BatchUpdateFindings",
                "securityhub:BatchImportFindings"
            ],
            "Resource": "*"
        }
    ]
}

Enter the following code in the Lambda function.

import json
import boto3
import os
import datetime

DEFAULT_SEVERITY = "HIGH"
DEFAULT_TYPES = ["TTPs/Defense Evasion"]
DEFAULT_TITLE = "DNS Firewall generated an alert"
DEFAULT_PRODUCT_NAME = "DNS Firewall"
DEFAULT_COMPANY_NAME = "Personal"
DEFAULT_DESC = DEFAULT_TITLE
PRODUCT_FIELDS_KEYS = [
    "transport",
    "query-name",
    "query-type",
    "query-class",
    "firewall-rule-action",
    "firewall-rule-group-id",
    "firewall-domain-list-id",
    "src-addr",
    "src-port",
    "vpc-id",
]

# Initialize AWS clients
securityhub = boto3.client("securityhub")
sts = boto3.client("sts")
AWS_REGION = os.environ["AWS_REGION"]
iam_identity = sts.get_caller_identity()
ACCOUNT_ID = iam_identity["Account"]
AWS_PARTITION = iam_identity["Arn"].split(":")[1]
PRODUCT_ARN = (
    "arn:"
    + AWS_PARTITION
    + ":securityhub:"
    + AWS_REGION
    + ":"
    + ACCOUNT_ID
    + ":product/"
    + ACCOUNT_ID
    + "/default"
)

def flatten_dict(dd, separator ='.', prefix =''):
    return { prefix + separator + k if prefix else k : v
             for kk, vv in dd.items()
             for k, v in flatten_dict(vv, separator, kk).items()
             } if isinstance(dd, dict) else { prefix : dd }

# function find_by_key to find a key and its value from a nested dict with a key_name as parameter
def find_by_key(key_name, dict_to_search):
    for key, value in dict_to_search.items():
        if key == key_name:
            return value
        elif isinstance(value, dict):
            result = find_by_key(key_name, value)
            if result is not None:
                return result

def get_resource_id(dict_to_search):
    if isinstance(dict_to_search, dict):
        resouce_id = find_by_key("id", dict_to_search)
        if resouce_id is not None and isinstance(resouce_id, str):
            return resouce_id
    return "dummy-resource-id-" + str(id(dict_to_search))

def lambda_handler(event, context):
    findings = []
    for record in event["Records"]:
        msg = json.loads(record["body"])
        detail = msg["detail"]
        findings.append(
            {
                # required top-level attributes
                "SchemaVersion": "2018-10-08",
                "AwsAccountId": msg["account"],
                "CreatedAt": msg["time"],
                "UpdatedAt": msg["time"],
                "Severity": {"Label": DEFAULT_SEVERITY},
                "Title": DEFAULT_TITLE,
                "Description": DEFAULT_DESC,
                "Types": DEFAULT_TYPES,
                "ProductArn": PRODUCT_ARN,
                "GeneratorId": msg["source"],
                "Id": detail['id'] if 'id' in detail.keys() else msg["id"],
                "Resources": [
                    {
                        "Type": "Other",
                        "Id": get_resource_id(x),
                        "Details": {"Other": flatten_dict(x)}
                    } for x in detail["resources"]
                ],
                # optional top-level attributes
                "CompanyName": DEFAULT_COMPANY_NAME,
                "ProductName": DEFAULT_PRODUCT_NAME,
                "LastObservedAt": datetime.datetime.fromtimestamp(
                    int(detail["last-observed-at"])
                ).strftime("%Y-%m-%dT%H:%M:%SZ"),
                "Region": msg["region"],
                "Action": {
                    "ActionType": "DNS_REQUEST",
                    "DnsRequestAction": {
                        "Blocked": (
                            False if detail["firewall-rule-action"] == "ALERT" else True
                        ),
                        "Domain": detail["query-name"],
                        "Protocol": detail["transport"],
                    },
                },
                "ProductFields": {
                    k: detail[k] for k in PRODUCT_FIELDS_KEYS if k in detail.keys()
                },
            }
        )
    response = securityhub.batch_import_findings(Findings=findings)
    print(response)
    if response["FailedCount"] != 0:
        raise Exception(json.dumps(response))

Add a trigger for the Lambda function by selecting the previously created SQS queue and setting the batch size to 100.

Testing & Verification

Add a test rule in your DNS Firewall rule group and specify some test DNS records. If you do not have your own test domain, you can use the AWS-provided test domain controldomain1.aggregatelist.firewall.route53resolver.us-east-1.amazonaws.com, which will resolve to 1.2.3.4.

In a VPC associated with the above DNS Firewall rule group, issue a domain resolution request, such as executing the following command on a Linux EC2 instance: dig controldomain1.aggregatelist.firewall.route53resolver.us-east-1.amazonaws.com

Wait a few minutes, in the Security Hub’s Findings page, you should be able to see alerts from the DNS Firewall, and after checking the finding you can see the specific domain name and the name of the resource that performs the DNS query in the details page.

Example 2 Configuration Steps

Creating a Lambda Function to Format Alerts and Insert Them into Security Hub

In the Lambda console, create a new function with the runtime set to “Python 3.12”. Enter the following code in lambda_function.py:

import json
import boto3
import os
import datetime

# Defaults for alarm integration
DEFAULT_SEVERITY = "HIGH"
DEFAULT_TYPES = ["Software and Configuration Checks/Industry and Regulatory Standards"]
DEFAULT_TITLE = "CloudWatch Alarm triggered"
DEFAULT_PRODUCT_NAME = "AWS CloudWatch"
DEFAULT_COMPANY_NAME = "Personal"
DEFAULT_DESC = "CloudWatch Alarm has triggered an alert due to threshold breach."

# Initialize AWS clients
securityhub = boto3.client("securityhub")
sts = boto3.client("sts")

# Constants for AWS
AWS_REGION = os.environ["AWS_REGION"]
iam_identity = sts.get_caller_identity()
ACCOUNT_ID = iam_identity["Account"]
AWS_PARTITION = iam_identity["Arn"].split(":")[1]
PRODUCT_ARN = (
    "arn:"
    + AWS_PARTITION
    + ":securityhub:"
    + AWS_REGION
    + ":"
    + ACCOUNT_ID
    + ":product/"
    + ACCOUNT_ID
    + "/default"
)

# Helper function to convert JSON string to dict
def json_loads_safe(json_string):
    try:
        return json.loads(json_string)
    except ValueError:
        return {}

# Function to handle the lambda event
def lambda_handler(event, context):
    # TODO implement   
    alarm_data = event.get('alarmData', {})
    alarm_state = alarm_data.get('state', {})
    previous_state = alarm_data.get('previousState', {})
    alarm_config = alarm_data.get('configuration', {})
    finding = {
        "SchemaVersion": "2018-10-08",
        "AwsAccountId": event.get("accountId", ""),
        "CreatedAt": event.get("time"),
        "UpdatedAt": event.get("time"),
        "Severity": {"Label": DEFAULT_SEVERITY},
        "Title": DEFAULT_TITLE,
        "Description": DEFAULT_DESC,
        "Types": DEFAULT_TYPES,
        "ProductArn": PRODUCT_ARN,
        "GeneratorId": event.get("alarmArn"),
        "Id": alarm_data.get('alarmName', '') + event.get("time"),
        "Resources": [
            {
                "Type": "AwsCloudWatchAlarm",
                "Id": event.get("alarmArn"),
                "Details": {
                    "Other": {
                        "AlarmName": alarm_data.get('alarmName', ''),
                        "State": alarm_state.get('value', ''),
                        "Reason": alarm_state.get('reason', ''),
                        "ReasonData": json.dumps(json_loads_safe(alarm_state.get('reasonData', '{}'))),
                        "Configuration": json.dumps(alarm_data.get('configuration', {}))  # Serialized configuration
                        # Note that the value of "Configuration" should NOT be longer than 1024 characters
                    }
                }
            }
        ],
        "Region": event.get("region"),
        "CompanyName": DEFAULT_COMPANY_NAME,
        "ProductName": DEFAULT_PRODUCT_NAME,
        "LastObservedAt": datetime.datetime.strptime(alarm_state.get('timestamp', ''), '%Y-%m-%dT%H:%M:%S.%f%z').strftime('%Y-%m-%dT%H:%M:%SZ')
    }
   
    response = securityhub.batch_import_findings(Findings=[finding])
    print(response)
    if response["FailedCount"] > 0:
        raise Exception(json.dumps(response))
       
    return {
        'statusCode': 200,
        'body': json.dumps('Finished.')
    }

Go to “Configuration” > “Permissions” in the Lambda console, and find the execution IAM role for the Lambda function. This role should have the following permissions (replace the account ID, region, Lambda function name with your actual values):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup"
            ],
            "Resource": "arn:aws:logs:<region>:<account_id>:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:<region>:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "securityhub:BatchImportFindings"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

In “Configuration” > “Permissions” in the Lambda console, add a resource-based policy statement to allow CloudWatch alarms to trigger the Lambda function. Specify Statement ID: cw-lambda-access, Principal: lambda.alarms.cloudwatch.amazonaws.com, and Action: lambda:InvokeFunction (replace the account ID, region, Lambda function name with your actual values).

{
  "Version": "2012-10-17",
  "Id": "default",
  "Statement": [
    {
      "Sid": "cw-lambda-access",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.alarms.cloudwatch.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:<region>:<account_id>:function:<lambda_function>"
    }
  ]
}

Creating a Test Sample Web ACL

In the WAF & Shield console, click on “Web ACLs” > “Create Web ACL”.

Set a custom name like “test-web-acl”, choose “Regional ACL” (check “Regional resources”), and select a region. Global Web ACLs are deployed across Amazon’s global points of presence (PoP) to protect user origin servers using consistent WAF rules integrated with CloudFront. Regional Web ACLs are used to protect web servers within a specific region.

Set a custom name “test-web-acl”, select “Regional ACL” (check “Regional resources”), and choose a region. Global Web ACLs are deployed across AWS’s global network of over 400 points of presence (PoP) to provide consistent WAF protection in combination with CloudFront to secure user origin servers. Regional Web ACLs are configured within each region to protect web servers in that specific region.

Add rules and rule groups > Add rules > Add managed rule groups, select to add AWS managed rule groups > SQL database rule to test SQL injection. The result is as follows. The remaining configurations can be left as default.

Create a Test Web Server and an ALB

Directly using WAF to protect Web servers in the production environment requires special attention to avoid false positives. Therefore, we will first create a simple test web server. Here, we use a Docker image of an echo-server to create an httpecho container that extracts and returns the request headers sent to the server via HTTP as an HTTP response to our browser.

First, launch a t2.micro EC2 instance using Amazon Linux 2 AMI. Install Docker on this EC2 instance by referring to the AWS documentation.

Specific Linux commands are as follows:

Install Docker

sudo yum update -y
sudo amazon-linux-extras install docker

Grant the ec2-user Linux permissions to start Docker

sudo usermod -a -G docker ec2-user

Log out of the current SSH session for the permissions to take effect

logout

Log in again as ec2-user to check Docker information

sudo service docker start
docker info | grep Ver

If the output is as shown below, Docker is successfully installed

Server Version: 20.10.7
Cgroup Version: 1
Kernel Version: 4.14.47-64.38.amzn2.x86_64

Run the echo-server Docker image, listening on TCP port 1028

docker run -d --name httpecho  -p 1028:8080 jmalloc/echo-server httpecho

Use curl to check the local service

curl 127.0.0.1:1028

If the output is as shown below, the service is running correctly. This output indicates that the echo-server received an HTTP request from curl with very simple content:

Request served by 16e51706efbe

HTTP/1.1 GET /

Host: 127.0.0.1:1028
User-Agent: curl/7.79.1
Accept: /

Create an ALB named httpechoalb listening on TCP port 80, with the backend forwarding traffic to TCP port 1028 on the EC2 instance. You can access the ALB domain through a browser to obtain HTTP request header information sent to the server.

Associate the created Web ACL – test-web-acl with the ALB. Click on the sample web ACL test-web-acl > Associated AWS resources and add the created ALB resource httpechoalb.

Create a Test WAF Alarm to Trigger a Lambda Function

In the CloudWatch console, navigate to All metrics >> WAFV2 > ManagedRuleGroup, Region, WebACL > Select ManagedRuleGroup > AWSManagedRulesSQLiRuleSet > Create alarm.

Specify the metric and conditions >> Metric – Metric name: “BlockedRequests”, WebACL: test-web-acl, Statistic: Sum, Period: 10 seconds. Conditions – Threshold type: Static, Whenever BlockedRequests is: >= 1.

Lambda action: In alarm > Select Lambda Function from the signed in account > Choose the Lambda function “cw-lambda-access” created.

Testing and Verification

Run the following curl command:

curl --header 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36 Edg/97.0.1072.69' -X POST "xxxx.us-east-1.elb.amazonaws.com" -F "user='AND 1=1;"

Note: Replace "xxxx.<region>.amazonaws.com" with the domain name of the created “httpechoalb” ALB. Please be aware that the Web ACL, CloudWatch alarm configuration, etc., shown here are for testing purposes and can be adjusted according to your needs.

After a few minutes, you should see the alert in the Security Hub Findings page coming from CloudWatch, and you can view detailed information by checking the finding.

Summary

Although Security Hub integrates with many AWS security services and third-party security services to conveniently collect and aggregate alert events generated by these security services, when users utilize security services that are not yet integrated with Security Hub, they still need to write their own code to complete the relevant integration work. This article introduces a typical integration architecture and uses alerts generated by DNS Firewall as an example to demonstrate how to insert security alerts into Security Hub for centralized display.

Autel, in collaboration with AWS, has built a centralized management solution for DNS Firewall threat events through AWS Security Hub, along with various security measures, automatically blocking over 10,000 different types of attacks and intrusions for Autel SaaS cloud customers daily. This solution showcases Autel’s innovation in cloud security technology and its capability to safeguard customer privacy and security.

Reference

https://thinkwithwp.com/cn/blogs/china/aws-waf-deployment-guide-1-waf-principle-default-deployment-and-log-storage/

本篇作者

Kaidi Ren

Senior Cybersecurity Engineer at Autel and a featured speaker at the 2024 AWS China Summit. Kaidi is deeply engaged in research and practical applications of IoT security, cloud security, and related interdisciplinary fields, with a strong focus on advancing innovative approaches to cybersecurity challenges in emerging technologies.

Xudong Wang

Security Specialist Solutions Architect at AWS, responsible for designing customer solution architectures on security. Prior to joining AWS, Xudong led infrastructure security implementation and management at a SaaS company.

Cathy Huang

Associate Solutions Architect at AWS, specializing in the design and implementation of cloud solutions for Auto industry. Focused on leveraging cutting-edge AI technologies to drive digital transformation and empower clients with scalable, efficient, and intelligent systems.

Qiang Guo

Senior Solutions Architect at AWS, previously worked at IBM and has two B2B entrepreneurial experiences. He has extensive practical experience in enterprise application architecture and B2B mobile internet solutions. Specializing in enterprise digital transformation, Guo excels in applying cloud, AI, and IoT technologies to solve complex business challenges. He currently focuses on AIoT and advanced AI applications research and implementation in the automotive industry, promoting innovative technologies across the automotive and related sectors.