AWS Developer Tools Blog
Using a Thread Pool with the AWS SDK for C++
The default thread executor implementation we provide for asynchronous operations spins up a thread and then detaches it. On modern operating systems, this is often exactly what we want. However, there are some other use cases for which this simply will not work. For example, suppose we want to fire off asynchronous calls to Amazon Kinesis as quickly as we receive an event. Then suppose that we sometimes receive these events at a rate of 10 per millisecond. Even if we are calling Amazon Kinesis from an Amazon Elastic Compute Cloud (EC2) instance in the same data center as our Amazon Kinesis stream, the latency will eventually cause the number of threads on our system to bloat and possibly exhaust.
Here is an example of what this code might look like:
#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/Aws.h>
using namespace Aws::Client;
using namespace Aws::Utils;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;
class KinesisProducer
{
public:
KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
{
ClientConfiguration clientConfiguration;
m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
}
~KinesisProducer()
{
Aws::Delete(m_client);
}
void StreamData(const Aws::Vector<ByteBuffer>& data)
{
PutRecordsRequest putRecordsRequest;
putRecordsRequest.SetStreamName(m_streamName);
for(auto& datum : data)
{
PutRecordsRequestEntry putRecordsRequestEntry;
putRecordsRequestEntry.WithData(datum)
.WithPartitionKey(m_partition);
putRecordsRequest.AddRecords(putRecordsRequestEntry);
}
m_client->PutRecordsAsync(putRecordsRequest,
std::bind(&KinesisProducer::OnPutRecordsAsyncOutcomeReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}
private:
void OnPutRecordsAsyncOutcomeReceived(const KinesisClient*, const Model::PutRecordsRequest&,
const Model::PutRecordsOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
{
if(outcome.IsSuccess())
{
std::cout << "Records Put Successfully " << std::endl;
}
else
{
std::cout << "Put Records Failed with error " << outcome.GetError().GetMessage() << std::endl;
}
}
KinesisClient* m_client;
Aws::String m_partition;
Aws::String m_streamName;
};
int main()
{
Aws::SDKOptions options;
Aws::InitAPI(options);
{
KinesisProducer producer("kinesis-sample", "announcements");
while(true)
{
Aws::String event1("Event #1");
Aws::String event2("Event #2");
producer.StreamData( {
ByteBuffer((unsigned char*)event1.c_str(), event1.length()),
ByteBuffer((unsigned char*)event2.c_str(), event2.length())
});
}
}
Aws::ShutdownAPI(options);
return 0;
}
This example is intended to show how exhausting the available threads from the operating system will ultimately result in a program crash. Most systems with this problem would be bursty and would not create such a sustained load. Still, we need a better way to handle our threads for such a scenario.
This week, we released a thread pool executor implementation. Simply include the aws/core/utils/threading/Executor.h file. The class name is PooledThreadExecutor. You can set two options: the number of threads for the pool to use and the overflow policy.
Currently, there are two overflow policy modes:
QUEUE_TASKS_EVENLY_ACROSS_THREADS will allow you to push as many tasks as you want to the executor. It will make sure tasks are queued and pulled by each thread as quickly as possible. For most cases, QUEUE_TASKS_EVENLY_ACROSS_THREADS is the preferred option.
REJECT_IMMEDIATELY will reject the task submission if the queued task length ever exceeds the size of the thread pool.
Let’s revise our example to use a thread pool:
#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/threading/Executor.h>
using namespace Aws::Client;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;
class KinesisProducer
{
public:
KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
{
ClientConfiguration clientConfiguration;
clientConfiguration.executor = Aws::MakeShared<PooledThreadExecutor>("kinesis-sample", 10);
m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
}
....
The only change we need to make to add the thread pool to our configuration is to assign an instance of the new executor implementation to our ClientConfiguration object.
As always, we welcome your feedback –and even pull requests– about how we can improve this feature.