AWS Big Data Blog
Real-time anomaly detection via Random Cut Forest in Amazon Managed Service for Apache Flink
August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.
Real-time anomaly detection describes a use case to detect and flag unexpected behavior in streaming data as it occurs. Online machine learning (ML) algorithms are popular for this use case because they don’t require any explicit rules and are able to adapt to a changing baseline, which is particularly useful for continuous streams of data where incoming data changes continuously over time.
Random Cut Forest (RCF) is one such algorithm widely used for anomaly detection use cases. In typical setups, we want to be able to run the RCF algorithm on input data with large throughput, and streaming data processing frameworks can help with that. We are excited to share that RCF is possible with Amazon Managed Service for Apache Flink. Apache Flink is a popular open-source framework for real-time, stateful computations over data streams, and can be used to run RCF on input streams with large throughput.
This post demonstrates how we can use Amazon Managed Service for Apache Flink to run an online RCF algorithm for anomaly detection.
Solution overview
The following diagram illustrates our architecture, which consists of three components: an input data stream using Amazon Kinesis Data Streams, a Flink job, and an output Kinesis data stream. In terms of data flow, we use a Python script to generate anomalous sine wave data into the input data stream, the data is then processed by RCF in a Flink job, and the resultant anomaly score is delivered to the output data stream.
The following graph shows an example of our expected result, which indicates that the anomaly score peaked when the sine wave data source anomalously dropped to constant -17.
We can implement this solution in three simple steps:
- Set up AWS resources via AWS CloudFormation.
- Set up a data generator to produce data into the source data stream.
- Run the RCF Flink Java code on Amazon Managed Service for Apache Flink.
Set up AWS resources via AWS CloudFormation
The following CloudFormation stack will create all the AWS resources we need for this tutorial, including two Kinesis data streams, an Amazon Managed Service for Apache Flink app, and an Amazon Simple Storage Service (Amazon S3) bucket.
Sign in to your AWS account, then choose Launch Stack:
Follow the steps on the AWS CloudFormation console to create the stack.
Set up a data generator
Run the following Python script to populate the input data stream with the anomalous sine wave data:
import json
import boto3
import math
STREAM_NAME = "ExampleInputStream-RCF"
def get_data(time):
rad = (time/100)%360
val = math.sin(rad)*10 + 10
if rad > 2.4 and rad < 2.6:
val = -17
return {'time': time, 'value': val}
def generate(stream_name, kinesis_client):
time = 0
while True:
data = get_data(time)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
time += 1
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
Run the RCF Flink Java code on Amazon Managed Service for Apache Flink
The CloudFormation stack automatically downloaded and packaged the RCF Flink job JAR file for you. Therefore, you can simply go to the Amazon Managed Service for Apache Flink console to run your application.
That’s it! We now have a running Flink job that continuously reads in data from an input Kinesis data stream and calculates the anomaly score for each new data point given the previous data points it has seen.
The following sections explain the RCF implementation and Flink job code in more detail.
RCF implementation
Numerous RCF implementations are publicly available. For this tutorial, we use the AWS implementation by wrapping it around a custom wrapper (RandomCutForestOperator
) to be used in our Flink job.
RandomCutForestOperator
is implemented as an Apache Flink ProcessFunction, which is a function that allows us to write custom logic to process every element in the stream. Our custom logic starts with a data transformation via inputDataMapper.apply
, followed by getting the anomaly score by calling the AWS RCF library via rcf.getAnomalyScore
. The code implementation of RandomCutForestOperator
can be found on GitHub.
RandomCutForestOperatorBuilder
requires two main types of parameters:
- RandomCutForestOperator hyperparameters – We use the following:
- Dimensions – We set this to 1 because our input data is a 1-dimensional sine wave consisting of the float data type.
- ShingleSize – We set this to 1, which means our RCF algorithm will take into account the previous and current data points in anomaly score deduction. Note that this can be increased to account for seasonality in data.
- SampleSize – We set this to 628, which means a maximum of 628 data points is kept in the data sample for each tree.
- DataMapper parameters for input and output processing – We use the following:
- InputDataMapper – We use
RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER
to map input data from float to float[]. - ResultMapper – We use
RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER
, which is a BiFunction that joins the anomaly score with the corresponding sine wave data point into a tuple.
- InputDataMapper – We use
Flink job code
The following code snippet illustrates the core streaming structure of our Apache Flink streaming Java code. It first reads in data from the source Kinesis data stream, then processes it using the RCF algorithm. The computed anomaly score is then written to an output Kinesis data stream.
DataStream<Float> sineWaveSource = createSourceFromStaticConfig(env);
sineWaveSource
.process(
RandomCutForestOperator.<Float, Tuple2<Float, Double>>builder()
.setDimensions(1)
.setShingleSize(1)
.setSampleSize(628)
.setInputDataMapper(RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER)
.setResultMapper(RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER)
.build(),
TupleTypeInfo.getBasicTupleTypeInfo(Float.class, Double.class))
.addSink(createSinkFromStaticConfig());
In this example, our baseline input data is a sine wave. As shown in the following screenshot, a low anomaly score is returned when the data is regular. However, when there is an anomaly in the data (when the sine wave input data drops to a constant), a high anomaly score is returned. The anomaly score is delivered into an output Kinesis data stream. You can visualize this result by creating an Amazon Managed Service for Apache Flink Studio app; for instructions, refer to Interactive analysis of streaming data.
Because this is an unsupervised algorithm, you don’t need to provide any explicit rules or labeled datasets for this operator. In short, only the input data stream, data conversions, and some hyperparameters were provided. The RCF algorithm itself determined the expected baseline based on the input data and identified any unexpected behavior.
Furthermore, this means the model will continuously adapt even if the baseline changes over time. As such, minimal retraining cadence is required. This is powerful for anomaly detection on streaming data because the data will often drift slowly over time due seasonal trends, inflation, equipment calibration drift, and so on.
Clean up
To avoid incurring future charges, complete the following steps:
- On the Amazon S3 console, empty the S3 bucket created by the CloudFormation stack.
- On the AWS CloudFormation console, delete the CloudFormation stack.
Conclusion
This post demonstrated how to perform anomaly detection on input streaming data with RCF, an online unsupervised ML algorithm using Amazon Managed Service for Apache Flink. We also showed how this algorithm learns the data baseline on its own, and can adapt to changes in the baseline over time. We hope you consider this solution for your real-time anomaly detection use cases.
About the Authors
Daren Wong is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.
Aleksandr Pilipenko is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.
Hong Liang Teoh is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.