AWS Big Data Blog

What’s new with Amazon MWAA support for Apache Airflow version 2.4.3

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud at scale. Amazon MWAA supports multiple versions of Apache Airflow (v1.10.12, v2.0.2, and v2.2.2). Earlier in 2023, we added support for Apache Airflow v2.4.3 so you can enjoy the same scalability, availability, security, and ease of management with Airflow’s most recent improvements. Additionally, with Apache Airflow v2.4.3 support, Amazon MWAA has upgraded to Python v3.10.8, which supports newer Python libraries like OpenSSL 1.1.1 as well as major new features and improvements.

In this post, we provide an overview of the features and capabilities of Apache Airflow v2.4.3 and how you can set up or upgrade your Amazon MWAA environment to accommodate Apache Airflow v2.4.3 as you orchestrate using workflows in the cloud at scale.

New feature: Data-aware scheduling using datasets

With the release of Apache Airflow v2.4.0, Airflow introduced datasets. An Airflow dataset is a stand-in for a logical grouping of data that can trigger a Directed Acyclic Graph (DAG) in addition to regular DAG triggering mechanisms such as cron expressions, timedelta objects, and Airflow timetables. The following are some of the attributes of a dataset:

  • Datasets may be updated by upstream producer tasks, and updates to such datasets contribute to scheduling downstream consumer DAGs.
  • You can create smaller, more self-contained DAGs, which chain together into a larger data-based workflow using datasets.
  • You have an additional option now to create inter-DAG dependencies using datasets besides ExternalTaskSensor or TriggerDagRunOperator. You should consider using this dependency if you have two DAGs related via an irregular dataset update. This type of dependency also provides you with increased observability into the dependencies between your DAGs and datasets in the Airflow UI.

How data-aware scheduling works

You need to define three things:

  • A dataset, or multiple datasets
  • The tasks that will update the dataset
  • The DAG that will be scheduled when one or more datasets are updated

The following diagram illustrates the workflow.

The producer DAG has a task that creates or updates the dataset defined by a Uniform Resource Identifier (URI). Airflow schedules the consumer DAG after the dataset has been updated. A dataset will be marked as updated only if the producer task completes successfully—if the task fails or if it’s skipped, no update occurs, and the consumer DAG will not be scheduled. If your updates to a dataset triggers multiple subsequent DAGs, then you can use the Airflow metric max_active_tasks_per_dag to control the parallelism of the consumer DAG and reduce the chance of overloading the system.

Let’s demonstrate this with a code example.

Prerequisites to build a data-aware scheduled DAG

You must have the following prerequisites:

  • An Amazon Simple Storage Service (Amazon S3) bucket to upload datasets in. This can be a separate prefix in your existing S3 bucket configured for your Amazon MWAA environment, or it can be a completely different S3 bucket that you identify to store your data in.
  • An Amazon MWAA environment configured with Apache Airflow v2.4.3. The Amazon MWAA execution role should have access to read and write to the S3 bucket configured to upload datasets. The latter is only needed if it’s a different bucket than the Amazon MWAA bucket.

The following diagram illustrates the solution architecture.

The workflow steps are as follows:

  1. The producer DAG makes an API call to a publicly hosted API to retrieve data.
  2. After the data has been retrieved, it’s stored in the S3 bucket.
  3. The update to this dataset subsequently triggers the consumer DAG.

You can access the producer and consumer code in the GitHub repo.

Test the feature

To test this feature, run the producer DAG. After it’s complete, verify that a file named test.csv is generated in the specified S3 folder. Verify in the Airflow UI that the consumer DAG has been triggered by updates to the dataset and that it runs to completion.

There are two restrictions on the dataset URI:

  • It must be a valid URI, which means it must be composed of only ASCII characters
  • The URI scheme can’t be an Airflow scheme (this is reserved for future use)

Other notable changes in Apache Airflow v2.4.3:

Apache Airflow v2.4.3 has the following additional changes:

  1. Deprecation of schedule_interval and timetable arguments. Airflow v2.4.0 added a new DAG argument schedule that can accept a cron expression, timedelta object, timetable object, or list of dataset objects.
  2. Removal of experimental Smart Sensors. Smart Sensors were added in v2.0 and were deprecated in favor of deferrable operators in v2.2, and have now been removed. Deferrable operators are not yet supported on Amazon MWAA, but will be offered in a future release.
  3. Implementation of ExternalPythonOperator that can help you run some of your tasks with a different set of Python libraries than other tasks (and other than the main Airflow environment).

For detailed release documentation with sample code, visit the Apache Airflow v2.4.0 Release Notes.

New feature: Dynamic task mapping

Dynamic task mapping was a new feature introduced in Apache Airflow v2.3, which has also been extended in v2.4. Dynamic task mapping lets DAG authors create tasks dynamically based on current data. Previously, DAG authors needed to know how many tasks were needed in advance.

This is similar to defining your tasks in a loop, but instead of having the DAG file fetch the data and do that itself, the scheduler can do this based on the output of a previous task. Right before a mapped task is run, the scheduler will create n copies of the task, one for each input. The following diagram illustrates this workflow.

It’s also possible to have a task operate on the collected output of a mapped task, commonly known as map and reduce. This feature is particularly useful if you want to externally process various files, evaluate multiple machine learning models, or extraneously process a varied amount of data based on a SQL request.

How dynamic task mapping works

Let’s see an example using the reference code available in the Airflow documentation.

The following code results in a DAG with n+1 tasks, with n mapped invocations of count_lines, each called to process line counts, and a total that is the sum of each of the count_lines. Here n represents the number of input files uploaded to the S3 bucket.

With n=4 files uploaded, the resulting DAG would look like the following figure.

Prerequisites to build a dynamic task mapped DAG

You need the following prerequisites:

  • An S3 bucket to upload files in. This can be a separate prefix in your existing S3 bucket configured for your Amazon MWAA environment, or it can be a completely different bucket that you identify to store your data in.
  • An Amazon MWAA environment configured with Apache Airflow v2.4.3. The Amazon MWAA execution role should have access to read to the S3 bucket configured to upload files. The latter is only needed if it’s a different bucket than the Amazon MWAA bucket.

You can access the code in the GitHub repo.

Test the feature

Upload the four sample text files from the local data folder to an S3 bucket data folder. Run the dynamic_task_mapping DAG. When it’s complete, verify from the Airflow logs that the final sum is equal to the sum of the count lines of the individual files.

There are two limits that Airflow allows you to place on a task:

  • The number of mapped task instances that can be created as the result of expansion
  • The number of mapped tasks that can run at once

For detailed documentation with sample code, visit the Apache Airflow v2.3.0 Release Notes.

New feature: Upgraded Python version

With Apache Airflow v2.4.3 support, Amazon MWAA has upgraded to Python v3.10.8, providing support for newer Python libraries, features, and improvements. Python v3.10 has slots for data classes, match statements, clearer and better Union typing, parenthesized context managers, and structural pattern matching. Upgrading to Python v3.10 should also help you align with security standards by mitigating the risk of older versions of Python such as 3.7, which is fast approaching its end of security support.

With structural pattern matching in Python v3.10, you can now use switch-case statements instead of using if-else statements and dictionaries to simplify the code. Prior to Python v3.10, you might have used if statements, isinstance calls, exceptions and membership tests against objects, dictionaries, lists, tuples, and sets to verify that the structure of the data matches one or more patterns. The following code shows what an ad hoc pattern matching engine might have looked like prior to Python v3.10:

def http_error(status):
        if status == 200:
           return 'OK'
        elif status == 400:
            return 'Bad request'
 	    elif status == 401:
      	    return 'Not allowed'
	    elif status == 403:
      	    return 'Not allowed'
 	    elif status == 404:
      	    return 'Not allowed'
 	    else:
	        return 'Something is wrong'

With structural pattern matching in Python v3.10, the code is as follows:

def http_error(status):
    match status:
        case 200:
            return 'OK'
        case 400:
            return 'Bad request'
        case 401 | 403 | 404:
            return 'Not allowed'
        case _:
            return 'Something is wrong'

Python v3.10 also carries forward the performance improvements introduced in Python v3.9 using the vectorcall protocol. vectorcall makes many common function calls faster by minimizing or eliminating temporary objects created for the call. In Python 3.9, several Python built-ins—range, tuple, set, frozenset, list, dict—use vectorcall internally to speed up runs. The second big performance enhancer is more efficient in the parsing of Python source code using the new parser for the CPython runtime.

For a full list of Python v3.10 release highlights, refer to What’s New In Python 3.10.

The code is available in the GitHub repo.

Set up a new Apache Airflow v2.4.3 environment

You can set up a new Apache Airflow v2.4.3 environment in your account and preferred Region using either the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using either AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform.

When you have successfully created an Apache Airflow v2.4.3 environment in Amazon MWAA, the following packages are automatically installed on the scheduler and worker nodes along with other provider packages:

  • apache-airflow-providers-amazon==6.0.0
  • python==3.10.8

For a complete list of provider packages installed, refer to Apache Airflow provider packages installed on Amazon MWAA environments. Note that some imports and operator names have changed in the new provider package in order to standardize the naming convention across the provider package. For a complete list of provider package changes, refer to the package changelog.

Upgrade from Apache Airflow v2.0.2 or v2.2.2 to Apache Airflow v2.4.3

Currently, Amazon MWAA doesn’t support in-place upgrades of existing environments for older Apache Airflow versions. In this section, we show how you can transfer your data from your existing Apache Airflow v2.0.2 or v2.2.2 environment to Apache Airflow v2.4.3:

  1. Create a new Apache Airflow v2.4.3 environment.
  2. Copy your DAGs, custom plugins, and requirements.txt resources from your existing v2.0.2 or v2.2.2 S3 bucket to the new environment’s S3 bucket.
    • If you use requirements.txt in your environment, you need to update the --constraint to v2.4.3 constraints and verify that the current libraries and packages are compatible with Apache Airflow v2.4.3
    • With Apache Airflow v2.4.3, the list of provider packages Amazon MWAA installs by default for your environment has changed. Note that some imports and operator names have changed in the new provider package in order to standardize the naming convention across the provider package. Compare the list of provider packages installed by default in Apache Airflow v2.2.2 or v2.0.2, and configure any additional packages you might need for your new v2.4.3 environment. It’s advised to use the aws-mwaa-local-runner utility to test out your new DAGs, requirements, plugins, and dependencies locally before deploying to Amazon MWAA.
  3. Test your DAGs using the new Apache Airflow v2.4.3 environment.
  4. After you have confirmed that your tasks completed successfully, delete the v2.0.2 or v2.2.2 environment.

Conclusion

In this post, we talked about the new features of Apache Airflow v2.4.3 and how you can get started using it in Amazon MWAA. Try out these new features like data-aware scheduling, dynamic task mapping, and other enhancements along with Python v.3.10.


About the authors

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.