AWS Big Data Blog

Introducing Amazon MWAA support for Apache Airflow version 2.9.2

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that significantly improves security and availability, and reduces infrastructure management overhead when setting up and operating end-to-end data pipelines in the cloud.

Today, we are announcing the availability of Apache Airflow version 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces several notable enhancements, such as new API endpoints for improved dataset management, advanced scheduling options including conditional expressions for dataset dependencies, the combination of dataset and time-based schedules, and custom names in dynamic task mapping for better readability of your DAGs.

In this post, we walk you through some of these new features and capabilities, how you can use them, and how you can set up or upgrade your Amazon MWAA environments to Airflow 2.9.2.

With each new version release, the Apache Airflow community is innovating to make Airflow more data-aware, enabling you to build reactive, event-driven workflows that can accommodate changes in datasets, either between Airflow environments or in external systems. Let’s go through some of these new capabilities.

Logical operators and conditional expressions for DAG scheduling

Prior to the introduction of this capability, users faced significant limitations when working with complex scheduling scenarios involving multiple datasets. Airflow’s scheduling capabilities were restricted to logical AND combinations of datasets, meaning that a DAG run would only be created after all specified datasets were updated since the last run. This rigid approach posed challenges for workflows that required more nuanced triggering conditions, such as running a DAG when any one of several datasets was updated or when specific combinations of dataset updates occurred.

With the release of Airflow 2.9.2, you can now use logical operators (AND and OR) and conditional expressions to define intricate scheduling conditions based on dataset updates. This feature allows for granular control over workflow triggers, enabling DAGs to be scheduled whenever a specific dataset or combination of datasets is updated.

For example, in the financial services industry, a risk management process might need to be run whenever trading data from any regional market is refreshed, or when both trading and regulatory updates are available. The new scheduling capabilities available in Amazon MWAA allow you to express such complex logic using simple expressions. The following diagram illustrates the dependency we need to establish.

The following DAG code contains the logical operations to implement these dependencies:

from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime

trading_data_asia = Dataset("s3://trading/asia/data.parquet")
trading_data_europe = Dataset("s3://trading/europe/data.parquet")
trading_data_americas = Dataset("s3://trading/americas/data.parquet")
regulatory_updates = Dataset("s3://regulators/updates.json")

@dag(
    dag_id='risk_management_trading_data',
    start_date=datetime(2023, 5, 1),
    schedule=((trading_data_asia | trading_data_europe | trading_data_americas) & regulatory_updates),
    catchup=False
)
def risk_management_pipeline():
    @task
    def risk_analysis():
        # Task for risk analysis
        ...

    @task
    def reporting():
        # Task for reporting
        ...

    @task
    def notifications():
        # Task for notifications
        ...

    analysis = risk_analysis()
    report = reporting()
    notify = notifications()

risk_management_pipeline()

To learn more about this feature, refer to Logical operators for datasets in the Airflow documentation.

Combining dataset and time-based schedules

With Airflow 2.9.2 environments, Amazon MWAA now has a more comprehensive scheduling mechanism that combines the flexibility of data-driven execution with the consistency of time-based schedules.

Consider a scenario where your team is responsible for managing a data pipeline that generates daily sales reports. This pipeline relies on data from multiple sources. Although it’s essential to generate these sales reports on a daily basis to provide timely insights to business stakeholders, you also need to make sure the reports are up to date and reflect important data changes as soon as possible. For instance, if there’s a significant influx of orders during a promotional campaign, or if inventory levels change unexpectedly, the report should incorporate these updates to maintain relevance.

Relying solely on time-based scheduling for this type of data pipeline could lead to potential issues such as outdated information and infrastructure resource wastage.

The DatasetOrTimeSchedule feature introduced in Airflow 2.9 adds the capability to combine conditional dataset expressions with time-based schedules. This means that your workflow can be invoked not only at predefined intervals but also whenever there are updates to the specified datasets, with the specific dependency relationship among them. The following diagram illustrates how you can use this capability to accommodate such scenarios.

See the following DAG code for an example implementation:

from airflow.decorators import dag, task
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime

# Define datasets
orders_dataset = Dataset("s3://path/to/orders/data")
inventory_dataset = Dataset("s3://path/to/inventory/data")
customer_dataset = Dataset("s3://path/to/customer/data")

# Combine datasets using logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset

@dag(
    dag_id="dataset_time_scheduling",
    start_date=datetime(2024, 1, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # Daily at midnight
        datasets=combined_dataset
    ),
    catchup=False,
)
def dataset_time_scheduling_pipeline():
    @task
    def process_orders():
        # Task logic for processing orders
        pass

    @task
    def update_inventory():
        # Task logic for updating inventory
        pass

    @task
    def update_customer_data():
        # Task logic for updating customer data
        pass

    orders_task = process_orders()
    inventory_task = update_inventory()
    customer_task = update_customer_data()

dataset_time_scheduling_pipeline()

In the example, the DAG will be run under two conditions:

  • When the time-based schedule is met (daily at midnight UTC)
  • When the combined dataset condition is met, when there are updates to both orders and inventory data, or when there are updates to customer data, regardless of the other datasets

This flexibility enables you to create sophisticated scheduling rules that cater to the unique requirements of your data pipelines, so they run when necessary and incorporate the latest data updates from multiple sources.

For more details on data-aware scheduling, refer to Data-aware scheduling in the Airflow documentation.

Dataset event REST API endpoints

Prior to the introduction of this feature, making your Airflow environment aware of changes to datasets in external systems was a challenge—there was no option to mark a dataset as externally updated. With the new dataset event endpoints feature, you can programmatically initiate dataset-related events. The REST API has endpoints to create, list, and delete dataset events.

This capability enables external systems and applications to seamlessly integrate and interact with your Amazon MWAA environment. It significantly improves your ability to expand your data pipeline’s capacity for dynamic data management.

As an example, running the following code from an external system allows you to invoke a dataset event in the target Amazon MWAA environment. This event could then be handled by downstream processes or workflows, enabling greater connectivity and responsiveness in data-driven workflows that rely on timely data updates and interactions.

curl -X POST <https://{web_server_host_name}>/api/v1/datasets/events \
   -H 'Content-Type: application/json' \
   -d '{"dataset_uri": "s3://bucket_name/bucket_key", "extra": { }}'

The following diagram illustrates how the different components in the scenario interact with each other.

To get more details on how to use the Airflow REST API in Amazon MWAA, refer to Introducing Amazon MWAA support for the Airflow REST API and web server auto scaling. To learn more about the dataset event REST API endpoints, refer to Dataset UI Enhancements in the Airflow documentation.

Airflow 2.9.2 also includes features to ease the operation and monitoring of your environments. Let’s explore some of these new capabilities.

Dag auto-pausing

Customers are using Amazon MWAA to build complex data pipelines with multiple interconnected tasks and dependencies. When one of these pipelines encounters an issue or failure, it can result in a cascade of unnecessary and redundant task runs, leading to wasted resources. This problem is particularly prevalent in scenarios where pipelines run at frequent intervals, such as hourly or daily. A common scenario is a critical pipeline that starts failing during the evening, and due to the failure, it continues to run and fails repeatedly until someone manually intervenes the next morning. This can result in dozens of unnecessary tasks, consuming valuable compute resources and potentially causing data corruption or inconsistencies.

The DAG auto-pausing feature aims to address this challenge by introducing two new configuration parameters:

  • max_consecutive_failed_dag_runs_per_dag – This is a global Airflow configuration setting. It allows you to specify the maximum number of consecutive failed DAG runs before the DAG is automatically paused.
  • max_consecutive_failed_dag_runs – This is a DAG-level argument. It overrides the previous global configuration, allowing you to set a custom threshold for each DAG.

In the following code example, we define a DAG with a single PythonOperator. The failing_task is designed to fail by raising a ValueError. The key configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set in the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to automatically pause the DAG after it fails three consecutive times.

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@task
def failing_task():
    raise ValueError("This task is designed to fail")

@dag(
    dag_id="auto_pause",
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=1),  # Run every minute
    catchup=False,
    max_consecutive_failed_dag_runs=3,  # Set the maximum number of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
    failing_task_instance = failing_task()

example_dag_with_auto_pause()

With this parameter, you can now configure your Airflow DAGs to automatically pause after a specified number of consecutive failures.

To learn more, refer to DAG Auto-pausing in the Airflow documentation.

CLI support for bulk pause and resume of DAGs

As the number of DAGs in your environment grows, managing them becomes increasingly challenging. Whether for upgrading or migrating environments, or other operational activities, you may need to pause or resume multiple DAGs. This process can become a daunting cyclical endeavor because you need to navigate through the Airflow UI, manually pausing or resuming DAGs one at a time. These manual activities are time consuming and increase the risk of human error that can result in missteps and lead to data inconsistencies or pipeline disruptions. The previous CLI commands for pausing and resuming DAGs could only handle one DAG at a time, making it inefficient.

Airflow 2.9.2 improves these CLI commands by adding the capability to treat DAG IDs as regular expressions, allowing you to pause or resume multiple DAGs with a single command. This new feature eliminates the need for repetitive manual intervention or individual DAG operations, significantly reducing the risk of human error, providing reliability and consistency in your data pipelines.

As an example, to pause all DAGs generating daily liquidity reporting using Amazon Redshift as a data source, you can use the following CLI command with a regular expression:

airflow dags pause —treat-dag-id-as-regex -y "^(redshift|daily_liquidity_reporting)"

Custom names for Dynamic Task Mapping

Dynamic Task Mapping was added in Airflow 2.3. This powerful feature allows workflows to create tasks dynamically at runtime based on data. Instead of relying on the DAG author to predict the number of tasks needed in advance, the scheduler can generate the appropriate number of copies of a task based on the output of a previous task. Of course, with great powers comes great responsibilities. By default, dynamically mapped tasks were assigned numeric indexes as names. In complex workflows involving high numbers of mapped tasks, it becomes increasingly challenging to pinpoint the specific tasks that require attention, leading to potential delays and inefficiencies in managing and maintaining your data workflows.

Airflow 2.9 introduces the map_index_template parameter, a highly requested feature that addresses the challenge of task identification in Dynamic Task Mapping. With this capability, you can now provide custom names for your dynamically mapped tasks, enhancing visibility and manageability within the Airflow UI.

See the following example:

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(data):
    # Perform data processing logic here
    print(f"Processing data: {data}")

@dag(
    dag_id="custom_task_mapping_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
)
def custom_task_mapping_example():
    mapped_processes = PythonOperator.partial(
        task_id="process_data_source",
        python_callable=process_data,
        map_index_template="Processing source={{ task.op_args[0] }}",
    ).expand(op_args=[["source_a"], ["source_b"], ["source_c"]])

custom_task_mapping_example()

The key aspect in the code is the map_index_template parameter specified in the PythonOperator.partial call. This Jinja template instructs Airflow to use the values of the ops_args environment variable as the map index for each dynamically mapped task instance. In the Airflow UI, you will see three task instances with the indexes source_a, source_b, and source_c, making it straightforward to identify and track the tasks associated with each data source. In case of failures, this capability improves monitoring and troubleshooting.

The map_index_template feature goes beyond simple template rendering, offering dynamic injection capabilities into the rendering context. This functionality unlocks greater levels of flexibility and customization when naming dynamically mapped tasks.

Refer to Named mapping in the Airflow documentation to learn more about named mapping.

TaskFlow decorator for Bash commands

Writing complex Bash commands and scripts using the traditional Airflow BashOperator may bring challenges in areas such as code consistency, task dependencies definition, and dynamic command generation. The new @task.bash decorator addresses these challenges, allowing you to define Bash statements using Python functions, making the code more readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to define dependencies between tasks and create complex workflows. You can also use Airflow’s scheduling and monitoring capabilities while maintaining a consistent coding style.

The following sample code showcases how the @task.bash decorator simplifies the integration of Bash commands into DAGs, while using the full capabilities of Python for dynamic command generation and data processing:

from airflow.decorators import dag, task
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Sample customer data
customer_data = """
id,name,age,city
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""

# Sample order data
order_data = """
order_id,customer_id,product,quantity,price
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""

@dag(
    dag_id='task-bash-customer_order_analysis',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
)
def customer_order_analysis_dag():
    @task.bash
    def clean_data():
        # Clean customer data
        customer_cleaning_commands = """
            echo '{}' > cleaned_customers.csv
            cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
            cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
        """.format(customer_data)

        # Clean order data
        order_cleaning_commands = """
            echo '{}' > cleaned_orders.csv
            cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
            cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
        """.format(order_data)

        return customer_cleaning_commands + "\n" + order_cleaning_commands

    @task.bash
    def transform_data(cleaned_customers, cleaned_orders):
        # Transform customer data
        customer_transform_commands = """
            cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%s\\n", $1, $2, $3}}' > transformed_customers.csv
        """.format(cleaned_customers=cleaned_customers)

        # Transform order data
        order_transform_commands = """
            cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%s\\n", $1, $2, $3, $4, $5}}' > transformed_orders.csv
        """.format(cleaned_orders=cleaned_orders)

        return customer_transform_commands + "\n" + order_transform_commands

    @task.bash
    def analyze_data(transformed_customers, transformed_orders):
        analysis_commands = """
            # Calculate total revenue
            total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
            echo "Total revenue: $total_revenue"

            # Find customers with multiple orders
            customers_with_multiple_orders=$(
                awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
            )
            echo "Customers with multiple orders: $customers_with_multiple_orders"

            # Find most popular product
            popular_product=$(
                awk -F',' '{{products[$3]++}} END {{max=0; for (p in products) if (products[p] > max) {{max=products[p]; popular=p}}}} END {{print popular}}'
            {transformed_orders})
            echo "Most popular product: $popular_product"
        """.format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)

        return analysis_commands

    cleaned_data = clean_data()
    transformed_data = transform_data(cleaned_data, cleaned_data)
    analysis_results = analyze_data(transformed_data, transformed_data)

customer_order_analysis_dag()

You can learn more about the @task.bash decorator in the Airflow documentation.

Set up a new Airflow 2.9.2 environment in Amazon MWAA

You can initiate the setup in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Airflow 2.9 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.

Upgrade from older versions of Airflow to version 2.9.2

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.9 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we announced the availability of Apache Airflow 2.9 environments in Amazon MWAA. We discussed how some of the latest features added in the release enable you to design more reactive, event-driven workflows, such as DAG scheduling based on the result of logical operations, and the availability of endpoints in the REST API to programmatically create dataset events. We also provided some sample code to show the implementation in Amazon MWAA.

For the complete list of changes, refer to Airflow’s release notes. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the authors

Hernan Garcia is a Senior Solutions Architect at AWS, based out of Amsterdam, working with enterprises in the Financial Services Industry. He specializes in application modernization and supports customers in the adoption of serverless technologies.

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.