AWS Big Data Blog
Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR
Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.
Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.
In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.
Solution overview
In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.
We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.
Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.
We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.
In the following sections, we walk through these components and how to use these automations in detail.
Generate Spark SQL metadata
Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.
Oozie workflow XML parser
We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml
, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.
The process consists of two steps:
- The Python parser script takes input of the existing Oozie Hive job and its configuration files.
- The script generates a metadata JSON file for each step.
The following diagram outlines these steps and shows sample output.
Prerequisites
You need the following prerequisites:
- Python 3.8
- Python packages:
- sqlparse==0.4.2
- jproperties==2.1.1
- defusedxml== 0.7.1
Setup
Complete the following steps:
- Install Python 3.8.
- Create a virtual environment:
- Activate the newly created virtual environment:
- Git clone the project:
- Install dependent packages:
Sample command
We can use the following sample command:
The output is as follows:
Limitations
This method has the following limitations:
- The Python script parses only HiveQL actions from the Oozie workflow.xml.
- The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.
Run Spark SQL on Amazon EMR
After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.
Architecture overview
The following diagram illustrates the solution architecture.
Prerequisites
You need the following prerequisites:
- Version:
- Spark 3.X
- Python 3.8
- Amazon EMR 6.1
Setup
Complete the following steps:
- Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
- Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
- Use the following AWS CLI command to create the metadata table in DynamoDB:
You can check on the DynamoDB console that the table dw-etl-metadata
is successfully created.
The metadata table has the following attributes.
Attributes | Type | Comments |
id |
String | partition_key |
step_id |
String | sort_key |
step_name |
String | Step description |
sql_base_path |
string | Base path |
sql_info |
list | List of SQLs in ETL pipeline |
. | sql_path | SQL file name |
. | sql_active_flag | y/n |
. | sql_load_order | Order of SQL |
. | sql_parameters | Parameters in SQL and values |
spark_config |
Map | Spark configs |
-
- Use the following AWS CLI command to create the log table in DynamoDB:
You can check on the DynamoDB console that the table dw-etl-pipelinelog
is successfully created.
The log table has the following attributes.
Attributes | Type | Comments |
job_run_id |
String | partition_key |
id |
String | sort_key (UUID) |
end_time |
String | End time |
error_description |
String | Error in case of failure |
expire |
Number | Time to Live |
sql_seq |
Number | SQL sequence number |
start_time |
String | Start time |
Status |
String | Status of job |
step_id |
String | Job ID SQL belongs |
The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.
- Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
- Enable secure transfer on the bucket:
- Clone the project to your workspace:
- Create a ZIP file and upload it to the code bucket created earlier:
- Upload the ETL driver code to the S3 bucket:
- Upload sample job SQLs to Amazon S3:
- Add a sample step (
./sample_oozie_job_name/step1/step1.json
) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
- In the Athena query editor, create the database
base
:
- Copy the sample data files from the repo to Amazon S3:
- Copy
us_current.csv
:
- Copy
- Copy
states_current.csv
:
- To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
- Run the
./sample_data/ddl/states_current.q
file by modifying the S3 path to the bucket you created.
- Run the
./sample_data/ddl/us_current.q
file by modifying the S3 path to the bucket you created.
- Run the
The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.
- Launch an EMR cluster.
- Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.
- Add the following steps to EMR cluster.
Step type | Spark Application |
Name | Any Name |
Deploy mode | Client |
Spark-submit options | --py-files s3://unique-code-bucket-name-#####/framework/code.zip |
Application location | s3://unique-code-bucket-name-####/framework/etl_driver.py |
Arguments | --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-####### |
Action on failure | Continue |
The following table summarizes the script arguments.
Script Argument | Argument Description |
deploy-mode |
Spark deploy mode. Client/Cluster. |
name <jobname>#<stepname> |
Unique name for the Spark job. This can be used to identify the job on the Spark History UI. |
py-files <s3 path for code>/code.zip |
S3 path for the code. |
<s3 path for code>/etl_driver.py |
S3 path for the driver module. This is the entry point for the solution. |
step_id <jobname>#<stepname> |
Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB. |
job_run_id <random UUID> |
Unique ID to identify the log entries in DynamoDB. |
log_table_name <DynamoDB Log table name> |
DynamoDB table for storing the job run details. |
code_bucket <s3 bucket> |
S3 path for the SQL files that are uploaded in the job setup. |
metadata_table <DynamoDB Metadata table name> |
DynamoDB table for storing the job metadata. |
sql_parameters DATE=2022-07-04::HOUR=00 |
Any additional or dynamic parameters expected by the SQL files. |
Validation
After completion of EMR step you should have data on S3 bucket for the table base.states_daily
. We can validate the data by querying the table base.states_daily
in Athena.
Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.
Solution highlights
This solution has the following benefits:
- It avoids boilerplate code for any new pipeline and offers less maintenance of code
- Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
- Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
- DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
- For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
- It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command
Limitations
This method has the following limitations:
- The solution only supports SQL queries on Spark
- Each SQL should be a Spark action like insert, create, drop, and so on
In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.
Clean up
Delete all the resources created as part of this solution to avoid ongoing charges for the resources:
- Delete the DynamoDB tables:
- Delete the S3 bucket:
- Stop the EMR cluster if it wasn’t a transient cluster:
Conclusion
In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.
For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.
About the authors
Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.
Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.
Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.