AWS Big Data Blog
Large-Scale Machine Learning with Spark on Amazon EMR
This is a guest post by Jeff Smith, Data Engineer at Intent Media. Intent Media, in their own words: “Intent Media operates a platform for advertising on commerce sites. We help online travel companies optimize revenue on their websites and apps through sophisticated data science capabilities. On the data team at Intent Media, we are responsible for processing terabytes of e-commerce data per day and using that data and machine learning techniques to power prediction services for our customers.”
Our Big Data Journey
Building large-scale machine learning models has never been simple. Over the history of our team, we’ve continually evolved our approach for running modeling jobs.
The dawn of big data: Java and Pig on Apache Hadoop
Our first data processing jobs were built on Hadoop MapReduce using the Java API. After building some basic aggregation jobs, we went on to develop a scalable, reliable implementation of logistic regression on Hadoop using this paradigm. While Hadoop MapReduce certainly gave us the ability to operate at the necessary scale, using the Java API resulted in verbose, difficult-to-maintain code. More importantly, the achievable feature development velocity using the complex Java API was not fast enough to keep up with our growing business. Our implementation of Alternating Direction Method of Multipliers (ADMM) logistic regression on Hadoop consists of several thousand lines of Java code. As you might imagine, it took months to develop. Compared with a library implementation of logistic regression that can be imported and applied in a single line, this was simply too large of a time investment.
Around the same time, we built some of our decisioning capabilities in Pig, a Hadoop-specific domain language (DSL). Part of our motivation for looking into Pig was to write workflows at a higher level of abstraction than the Java Hadoop API allowed. Although we had some successes with Pig, eventually we abandoned it. Pig was still a young application, and because it is implemented as a DSL, it led to certain inherent difficulties for our team, such as immature tooling. For example, PigUnit, the xUnit testing framework for Pig, was only released in December 2010 and took a while to mature. For years after its release, it was still not integrated into standard Pig distributions or published as a Maven artifact. Given our strong TDD culture, we really craved mature tooling for testing. Other difficulties included the impedance mismatch with our codebaase at the time, which was largely Java.
Going functional and logical with Cascalog
Our issues with Pig drove us to re-implement many of these capabilities in Cascalog, a Hadoop DSL written in Clojure and inspired by Datalog. We found that Cascalog’s more concise logic programming style can be far more declarative than low-level Java Hadoop MapReduce code. Moreover, by using Cascalog, we were able to write the supporting functionality of our capabilities in Clojure with less of an impedance mismatch problem than with Pig. As a functional language, Clojure allowed us to write more pure, testable, and composable code.
Despite these advantages, we still found Cascalog lacking as a long-term solution. As a macro-driven DSL, it was often difficult to reason about its behavior. Particular composition approaches still led to difficulties with testing isolated components, rather than whole pipelines. The interoperation with Clojure also proved to be less true in practice than in principle. Logic programming is a useful paradigm for reasoning about data transformations. But Cascalog’s version of logic programming simply has different semantics than Clojure’s version of functional programming. This mismatch made it hard to develop good patterns for combined Clojure/Cascalog applications.
Questioning Hadoop MapReduce
By this time, we had also realized that Hadoop MapReduce’s approach to data processing was not always a good fit for our use case. By writing to disk after each step, Hadoop ensures fault-tolerance but incurs a significant runtime cost. Because we use Amazon Elastic MapReduce (Amazon EMR), we largely did not need most of this fault-tolerance. Our EMR clusters were being spun up dynamically. The outputs were stored in S3 and an application database, so persistence using HDFS after a job was completed was not necessary. Also, Hadoop MapReduce’s execution model was simply not a great fit for the highly-iterative machine learning algorithms that we were trying to implement.
We realized that we wanted a data processing platform suited for iterative computations, ideally with some understanding of machine learning out of the box. It needed to have a high-level API that allowed us to compose our applications using functional programming idioms in a robust production language. There is, in fact, such a platform; it’s called Spark.
Success with Spark
Apache Spark was originally developed at UC Berkeley explicitly for the use case of large-scale machine learning. Early in Spark’s development, the team realized that Spark could be a general data processing platform, so they carved out different pieces of functionality into separate subprojects, all relying on common facilities provided by Spark Core. The machine learning capabilities became a library called MLlib, and there are libraries for streaming, SQL, and graph processing as well.
Performance
Compared to Hadoop, Spark is much better suited for building large-scale machine learning problems. By maintaining and reasoning about the execution’s directed acyclic graph (DAG), Spark can figure out when to cache data in memory. This and other features allow it to be up to 100 times faster than Hadoop for some workflows. In our experience, it we have seen an order of magnitude of performance improvement before any tuning.
Loving our code again
Beyond better performance, the developer experience when using Spark is much better than when developing against Hadoop. Spark’s Scala, Java, and Python APIs are famously well-conceived and provide a functional programming data model that is declarative and high-level. At first, we wrote our jobs against the Java API using a Clojure DSL called flambo. The usability and functional style of Spark’s API kept flambo’s design very simple, allowing us to extend the library’s functionality where we needed greater access to Spark and MLlib’s capabilities.
More recently, we’ve been exploring Scala for writing some of our Spark jobs. By removing the intermediate layer of flambo, we get even simpler code and can adopt new Spark features as soon as they’re released. We expect the transition to be smooth and easy, due to the high quality and clarity of our existing Clojure codebase that we use to build Spark jobs. This is largely a testament to the power of Spark’s resilient distributed dataset abstraction and the functional programming model it allows. Scala, like Clojure, is an excellent functional programming language that allows us to write clean, testable code.
We just released an open source library for composing pipelines called Mario, which the example below will use. It is motivated by our experiences with Spark and our desire to compose pipelines in a functional, declarative style, similar to the way Spark programs are written. For more details about Mario, see the launch post on the Intent Media blog.
Algorithms for free
Another significant advantage of using Spark as a platform has been getting access to scalable library implementations of common machine learning algorithms via MLlib. In the past, we had to implement every machine learning algorithm that we wanted to employ because they were not readily available for running jobs at scale with Hadoop. With MLlib and Spark, many learning algorithms are already available, and a vibrant open source community is actively improving them and adding new algorithms.
Enabling Services
Some people might be overwhelmed by the number of changes that we’ve made to our data platform in just a few years. We’re proud of our ability to iterate rapidly and find new solutions. It’s part of the fun at a fast-moving startup.
A big reason why we’ve been able to continually refine our approaches to machine learning at scale is that we use Amazon Web Services (AWS) to provide us with the infrastructure we need to execute on our ambitions. All of our applications have always been deployed on Amazon EC2. All of our raw data is stored on Amazon S3. We use Amazon DynamoDB to store the data for our user identification system.
Iterating and shipping using Amazon EMR
Perhaps most importantly, all of our large-scale data processing jobs are executed on EMR. When we started using Hadoop with EMR, we were able to focus on the higher-level problems of data processing and modeling, rather than creating and maintaining Hadoop clusters. This allowed us to rapidly and broadly explore all of the approaches discussed above. After learning that Hadoop would not be a long-term solution for our modeling workflows, we used EMR to get up and running quickly on the Spark platform.
By being able to construct on-demand clusters programmatically that auto-terminate on completion, we’ve been able to use ephemeral clusters for all our data jobs. For much of the day, we can have very few data processing clusters running at any given time. But periodically, we spin up many large clusters via EMR that train all of the models that we need to learn. This usage pattern is neither harder to implement nor more expensive than a serial execution of all of our jobs and matches our preferred workflow much better. For our usage pattern, this actually represents a large cost savings over a persistent cluster.
Spark on EMR can also read and write directly to S3 using EMRFS. This allowed us to continue to use S3 as our persistent data store as we had done in our Hadoop workflows.
Similar to the advantages we get from Spark and MLlib, there is a huge advantage for a startup like ours to pick up tooling advances made by other teams. This frees up time that we would have otherwise spent building this functionality in-house. In the case of EMR, we don’t need to worry about finding the time to implement cluster termination, Hadoop installation, cluster-level monitoring, or a cluster management interface. By giving us a web console that allows us to manage our clusters, EMR makes it much easier to get everyone up to speed on the status of their jobs and to show people how to debug jobs as they are developed. The simplicity of creating and managing clusters via the web interface allows data analysts to use Hadoop and Spark on EMR clusters for ad hoc analyses without needing deep knowledge in cluster creation or management.
A Sample Workflow
Below is an example of a simplified version of the sort of workflow we run many times every day on EMR. It is a basic model learning pipeline, starting with ingesting previously learned features and ending with persisting a learned model.
This example builds on our initial prototype workflow developed last year, as well as the examples in the Spark programming guide. Additionally, this example uses the Mario library that we developed to compose our pipelines in a type-safe and declarative style using functional idioms.
In our production pipeline, we extract features from the raw data collected by our applications and stored on S3. These features are semantically meaningful, derived representations of our raw data. They are the input to our model learning algorithm. Our production machine learning pipeline extracts hundreds of non-trivial features, but this example simply uses arbitrary identifiers to stand in for real features.
In this example, we begin with defining a function to load the feature data from S3. This function is used to load both the training and testing datasets.
def loadFeatures(inputPath: String) = MLUtils.loadLibSVMFile(sc, inputPath)
The example files use LibSVM format, a common format for storing features. MLlib comes with utilities that understand this format and can parse it into an RDD of LabeledPoints. It’s also worth noting that LibSVM is a sparse format. Having native support for a common sparse format is tremendously valuable for us. In previous versions of our pipeline, we wrote a lot of code that handled transformations between dense and sparse formats for different libraries.
Because the loading of training and testing data does not rely on any upstream dependencies, these two steps can be started concurrently. Spark is able to determine this through its understanding of the DAG of the job. Also, the Mario library provides similar guarantees around concurrent execution for all operations defined in the pipeline, even if those operations are not Spark operations (e.g., retrieving data from a database).
Next, you define a function to learn a model from the features in the training set. This is one of the key advantages we picked up in our transition to Spark and MLlib. All of the model learning functionality comes from the library implementation.
def learnModel(trainingData: RDD[LabeledPoint]) = new LogisticRegressionWithLBFGS() .setNumClasses(2) .run(trainingData)
Then you define a function to evaluate that model over a test set. This allows you to see how the model can be expected to perform in live usage. In our production workflow, it also allows us to set a threshold for classification based on arbitrary parameters that come from our business use case.
def predictAndLabel(testingData: RDD[LabeledPoint], model: LogisticRegressionModel) = testingData.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) }
You can define a function to see some of those basic statistics around model performance using built-in functionality from MLlib:
def metrics(predictionsAndLabels: RDD[(Double, Double)]) = new MulticlassMetrics(predictionsAndLabels)
Then, you define functions to persist the model and the performance statistics to disk (S3):
def saveModel(outputPath: String)(model: LogisticRegressionModel) = { val modelPath = outputPath + "/model" model.save(sc, modelPath) } def saveMetrics(outputPath: String)(metrics: MulticlassMetrics) = { val precision = metrics.precision val recall = metrics.recall val metricsPath = outputPath + "/metrics" val metricsStringRDD = sc.parallelize(List(precision, recall)) metricsStringRDD.saveAsTextFile(metricsPath) }
Then you compose all of these steps into a single pipeline:
for { trainingDataStep <- pipe(loadFeatures(trainingDataPath)) testingDataStep <- pipe(loadFeatures(testingDataPath)) modelStep <- pipe(learnModel, trainingDataStep) predictionsAndLabelsStep <- pipe(predictAndLabel, testingDataStep, modelStep) metricsStep <- pipe(metrics, predictionsAndLabelsStep) saveMetricsStep <- pipe(saveMetrics(currentOutputPath), metricsStep) saveModelStep <- pipe(saveModel(currentOutputPath), modelStep) } yield modelStep.runWith(saveMetricsStep, saveModelStep)
Those steps that do not yield a result and simply have a side effect (writing to disk) are specified in the call to the runWith method. The composed pipeline yields the learned model as its return value.
In our production workflow, we load some of this persisted data into an application database for use in runtime model serving. Other data about the model learning pipeline’s performance is loaded to an analytics database for reporting and analytics tasks.
This job must be compiled into a JAR to be provided to your EMR cluster. Do this using sbt:
sbt assembly
After you have a JAR, you can push it to S3. The following AWS CLI command copies the JAR to S3:
aws s3 cp spark-emr/target/scala-2.10/spark-emr-assembly-1.0.jar s3://your-bucket-name/$USER/spark/jars/spark-emr-assembly-1.0.jar
Then, start up an EMR cluster that executes the Spark job you just wrote using the AWS CLI:
aws emr create-cluster --name "exampleJob" --ec2-attributes KeyName=MyKeyName --auto-terminate --ami-version 3.8.0 --instance-type m3.xlarge --instance-count 3 --log-uri s3://your-bucket-name/$USER/spark/`date +%Y%m%d%H%M%S`/logs --applications Name=Spark,Args=[-x] --steps "Name="Run Spark",Type=Spark,Args=[--deploy-mode,cluster,--master,yarn-cluster,--conf,spark.executor.extraJavaOptions=-XX:MaxPermSize=256m,--conf,spark.driver.extraJavaOptions=-XX:MaxPermSize=512m,--class,ModelingWorkflow,s3://your-bucket-name/$USER/spark/jars/spark-emr-assembly-1.0.jar,s3://support.elasticmapreduce/bigdatademo/intentmedia/,s3://your-bucket-name/$USER/spark/output/]"
After the job finishes, the cluster terminates automatically. You can adjust the size of the nodes and the size of the cluster by just changing the InstanceCount and InstanceType arguments to suit your workload. You can see the full code for this example in the AWS Big Data Blog GitHub Repo.
Wrapping Up
This post has given you an overview of how Intent Media has evolved our data platform through a variety of different approaches. We’ve found great success using popular open source frameworks like Spark and MLlib to learn models at massive scale. The advantages of using these tools are further amplified by relying on AWS and EMR, specifically, to create and manage our clusters. The combination of these approaches has enabled us to move quickly and scale with our technology alongside our rapidly expanding business.
If you have questions or suggestions, please leave a comment below.
——————————
More posts about Machine Learning:
Building a Numeric Regression Model with Amazon Machine Learning
———————————-
Love to work on open source? Check out EMR’s careers page.