AWS Storage Blog
Build a managed transactional data lake with Amazon S3 Tables
UPDATE (12/19/2024): Added guidance for Amazon EMR setup.
Customers commonly use Apache Iceberg today to manage ever-growing volumes of data. Apache Iceberg’s relational database transaction capabilities (ACID transactions) help customers deal with frequent updates, deletions, and the need for transactional consistency across datasets. However, getting the most out of Apache Iceberg tables and running it efficiently at scale requires some maintenance and management overhead. As their data lakes grow, customers list three challenges that can impact their operations and efficiency. First, the accumulation of small files slows down query performance, thereby impacting critical data analysis processes. Second, table maintenance becomes operational overhead, with customers needing to carefully consider when and how to run compaction, snapshot management, and unreferenced file cleanup to keep these tables performant and cost-effective. Finally, customers need to find effective ways to establish access control boundaries at the table, or namespace level.
To address these operational challenges, AWS introduced Amazon S3 Tables at re:Invent 2024, which is the first cloud object store with built-in Apache Iceberg support, and the easiest way to store tabular data at scale. You can use S3 Tables to store tabular data such as daily purchase transactions, streaming sensor data, or ad impressions as an Iceberg table in S3, and optimize performance and cost as your data evolves using automatic table maintenance.
In this post, we provide an overview of S3 Tables, and an example of how to build a transactional data lake with S3 Tables using Apache Spark on Amazon EMR. With this setup, you will be able to create S3 table buckets, load data into tables, and perform standard Apache Iceberg operations on these tables.
When to use S3 Tables
S3 Tables offer built-in support for Apache Iceberg, which simplifies managing data lakes at scale while improving query performance and reducing costs. Consider using S3 Tables for your data lake if you want to experience the following results.
- Simplified security: S3 Tables treat your tables as AWS resources, enabling fine-grained AWS Identity and Access Management (IAM) permissions at the table level. This simplifies data governance, enhances security, and makes access control more intuitive and manageable with your familiar AWS services.
- Automatic table maintenance: S3 Tables automate critical maintenance tasks including compaction, snapshot management, and unreferenced file removal. This continuous optimization ensures that tables remain performant and cost-effective without manual intervention, reducing operational overhead and allowing teams to focus on data insights.
- Enhanced performance: Amazon S3 Tables introduce a new type bucket type, purpose-built for storing Apache Iceberg tables. Table buckets deliver up to 3x faster query performance and up to 10x higher transactions per second compared to storing Iceberg tables in general purpose S3 buckets. This performance enhancement supports high-frequency updates, real-time ingestion, and more demanding workloads, ensuring scalability and responsiveness as data volumes grow.
Before we dive deep into the solution, let’s introduce some new concepts related to S3 Tables.
Amazon S3 Tables: Concepts
- Table buckets: Table buckets are a new S3 bucket type that you can use to create and store tables.
- Namespaces: Namespaces provide a way to group your tables. For example, you can group all your tables under development under a ‘dev-tables’ namespace
- Tables: Tables are resources of a table bucket. Each table has its own unique Amazon Resource Name (ARN), including the unique Table ID, and a resource policy attached to it.
- Amazon S3 Tables Catalog for Apache Iceberg: The Amazon S3 Tables Catalog for Apache Iceberg is an open source library that bridges control plane operations provided by S3 Tables to engines like Apache Spark when used with Apache Iceberg.
Use case and solution overview
In this post, we cover a common use case in the retail industry, where sales data is dynamic due to product variability and customer purchase and return patterns. To boost revenue and conversion rates, businesses require real-time analytics for offering timely promotions and targeted discounts. This requires insertion and updating of sales data, and the ability to review historical transactions. Our solution demonstrates how to address these needs by loading a sample sales dataset into S3 Tables, enabling real-time analysis and time travel capabilities.
In this example, we use a Jupyter notebook with Amazon EMR to execute the steps, however you can use any platform of your choice.
For this post, we will use a publicly hosted TPCDS dataset. We have divided this post in three parts:
Part 1: Setting up a table bucket, EMR cluster, and Jupyter notebook.
Part 2: Initialize Apache Spark and load data into S3 Tables.
Part 3: Perform Apache Iceberg queries on S3 Tables.
Part 1: Solution setup
The following steps walk you through how to create a table bucket, setup the EMR cluster and Jupyter notebook.
Step 1: Create a table bucket.
First, we create an S3 table bucket named blog-s3tables-data
using the AWS CLI.
aws s3tables create-table-bucket --region us-east-1 --name blog-s3tables-data
The API response returns the table bucket ARN, which we use later.
arn:aws:s3tables:us-east-1:123456789012/blog-s3tables-data
Step 2: Create an EMR cluster.
To provision an EMR cluster, you can download the CloudFormation (CFN) template. This template provisions an EMR 7.5.0 cluster with one r5.8xlarge master node and one r5.16xlarge core node, creates security groups and IAM roles, and attaches the required IAM policy to use S3 Tables. Note that the cluster created using the template will auto-terminate after 1 hour of idle time.
Note: For EMR to interact with S3 Tables, ensure that:
- The subnets have internet connectivity for the cluster to reach out to Maven to download required packages. Here, we download the package “software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:x.x.x” from Maven, as we have internet connectivity enabled in our CFN template.
- The role associated with the EMR instance has the right permissions to read and write to S3 Tables. In our CFN template, we are creating a new role and attaching a managed policy named AmazonS3TablesFullAccess which provides full access to S3 Tables. For production implementations, we recommend to fine tune these permissions and provide least privilege access.
Step 3: Open Jupyter notebook.
Once the cluster in step 2 above comes to Waiting status, go the Applications tab in your EMR Console and select JupyterHub. Ensure that the security group attached to your cluster allows traffic into port 9443 from your machine’s IP you provided when creating the CFN stack.
Now download the sample PySpark notebook and upload in your Jupyter notebook environment. Refer to the documentation for login instructions to Jupyter notebook.
Part 2: Initialize Apache Spark and load data into S3 Tables
Now that our setup is ready, let’s create tables and load sample data.
Step 1: Create a Spark session.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Iceberg Table Demo") \
.getOrCreate()
Step 2: Set the configuration parameters for the Spark Session, and use the S3 table bucket ARN for the bucket created in Part 1.
%%configure -f
{
"conf":{
"spark.jars.packages": "software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.demoblog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.demoblog.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
"spark.sql.catalog.demoblog.warehouse": "bucket ARN for blog-s3tables-data"
}
}
Step 3: Load the public dataset into a Spark DataFrame.
src_df_store = spark.read.format("parquet").load("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/store/")
print(f"Number of records are {src_df_store.count()} ")
Step 4: Create a namespace blognamespace
and define a table name.
spark.sql(f"""create namespace if not exists demoblog.blognamespace""")
spark.sql(f"""show namespaces in demoblog""").show()
table_store = "demoblog.blognamespace.blogstoresales"
Step 5: Next, we load data into the table.
src_df_store.writeTo(table_store).using("iceberg").createOrReplace()
Step 6: We query the data in the table to check if all the rows were inserted into the table.
spark.sql(f"select count(*) from {table_store}").show()
Part 3: Perform Apache Iceberg queries on S3 Tables
Now that we have loaded data into the table, we add new columns to the table and perform a time travel to go back to the previous state of the table.
Step 1: Alter table. We alter the table by adding a new column ‘c_flag’ to the table.
# Alter table and add a new column
import pyspark.sql.utils
try:
spark.sql(f"ALTER TABLE {table_store} ADD COLUMNS c_flag STRING")
except pyspark.sql.utils.AnalysisException:
print("Column already exists")
Next, we set this column to ‘Y’ for every existing row.
from pyspark.sql import functions as sf
src_df_store.withColumn("c_flag",sf.lit("Y")).writeTo(table_store).using("iceberg").createOrReplace()
Step 2: We check snapshot history of the table to look at the list of snapshots written to this table.
spark.sql(f"SELECT * FROM {table_store}.history LIMIT 10").show()
Step 3: Time travel. We query the table from a previous snapshot. This snapshot will not have the column we newly created while altering the table.
spark.sql(f"SELECT * FROM {table_store} for system_version as of <snapshot-id-before-alter> LIMIT 2").show()
Now, let’s query the latest snapshot, where the newly inserted column ‘c_flag’ is available.
spark.sql(f"SELECT * FROM {table_store} for system_version as of <snapshot-id-after-alter> LIMIT 2").show()
Cleaning up
You can follow the steps below to remove all resources you’ve set up while following along with this post.
- Run the CLEANUP steps provided at the end of the notebook.
- Delete the S3 table bucket after deleting all tables and namespaces within the table bucket. To do this, you can run the following command in the CLI.
aws s3tables delete-table-bucket --table-bucket-arn <table bucket ARN>
- Navigate to CloudFormation in the AWS Management Console, locate the stack you created for your setup and click the Delete button. This will remove all the resources, except the security groups.
- To delete the security groups, go to the Amazon Elastic Compute Cloud (Amazon EC2) Console and find the Security Groups section under Network & Security in the left sidebar. For each security group associated with your deleted stack, remove all inbound rules. Once you’ve cleared the inbound rules, you can then delete the security groups.
Conclusion
In this post, you learned how to build a managed data lake using S3 Tables. With its managed Apache Iceberg capabilities, Amazon S3 Tables provide a cost-effective and performant solution for building your transactional data lake. Additionally, you can integrate your table buckets with the AWS Glue Data Catalog. S3 Tables integration with AWS Glue Data Catalog is in preview, allowing you to query and visualize data using AWS Analytics services such as Amazon Athena, Redshift, and QuickSight.
To learn more about S3 Tables, visit the S3 User Guide.
Also, check out the AWS News Blog.