AWS Compute Blog

Decoupling event publishing with Amazon EventBridge Pipes

This post is written by Gregor Hohpe, Sr. Principal Evangelist, Serverless.

Event-driven architectures (EDAs) help decouple applications or application components from one another. Consuming events makes a component less dependent on the sender’s location or implementation details. Because events are self-contained, they can be consumed asynchronously, which allows sender and receiver to follow independent timing considerations. Decoupling through events can improve development agility and operational resilience when building fine-grained distributed applications, which is the preferred style for serverless applications.

Many AWS services publish events through built-in mechanisms to support building event-driven architectures with a minimal amount of custom coding. Modern applications built on top of those services can also send and consume events based on their specific business logic. AWS application integration services like Amazon EventBridge or Amazon SNS, a managed publish-subscribe service, filter those events and route them to the intended destination, providing additional decoupling between event producer and consumer.

Publishing events

Custom applications that act as event producers often use the AWS SDK library, which is available for 12 programming languages, to send an event message. The application code constructs the event as a local data structure and specifies where to send it, for example to an EventBridge event bus.

The application code required to send an event to EventBridge is straightforward and only requires a few lines of code, as shown in this (simplified) helper method that publishes an order event generated by the application:

async function sendEvent(event) {
    const ebClient = new EventBridgeClient();
    const params = { Entries: [ {
          Detail: JSON.stringify(event),
          DetailType: "newOrder",
          Source: "orders",
          EventBusName: eventBusName
        } ] };
    return await ebClient.send(new PutEventsCommand(params));
}

An application most likely calls such a method in the context of another action, for example when persisting a received order to a data store. The code that performs those tasks might look as follows:

const order = { "orderId": "1234", "Items": ["One", "Two", "Three"] }
await writeToDb(order);
await sendEvent(order);

The code populates an order object with multiple line items (in reality, this would be based on data entered by a user or received via an API call), writes it to a database (via another helper method whose implementation isn’t shown), and then sends it to an EventBridge bus via the preceding method.

Code causes coupling

Although this code is not complex, it has drawbacks from an architectural perspective:

  • It interweaves application logic with the solution’s topology because the destination of the event, both in terms of service (EventBridge versus SNS, for example) and the instance (the service bus name in this case) are defined inside the application’s source code. If the event destination changes, you must change the source code, or at least know which string constant is passed to the function via an environment variable. Both aspects work against the EDA principle of minimizing coupling between components because changes in the communication structure propagate into the producer’s source code.
  • Sending the event after updating the database is brittle because it lacks transactional guarantees across both steps. You must implement error handling and retry logic to handle cases where sending the event fails, or even undo the database update. Writing such code can be tedious and error-prone.
  • Code is a liability. After all, that’s where bugs come from. In a real-life example, a helper method similar to preceding code erroneously swapped day and month on the event date, which led to a challenging debugging cycle. Boilerplate code to send events is therefore best avoided.

Performing event routing inside EventBridge can lessen the first concern. You could reconfigure EventBridge’s rules and targets to route events with a specified type and source to a different destination, provided you keep the event bus name stable. However, the other issues would remain.

Serverless: Less infrastructure, less code

AWS serverless integration services can alleviate the need to write custom application code to publish events altogether.

A key benefit of serverless applications is that you can let the AWS Cloud do the undifferentiated heavy lifting for you. Traditionally, we associate serverless with provisioning, scaling, and operating compute infrastructure so that developers can focus on writing code that generates business value.

Serverless application integration services can also take care of application-level tasks for you, including publishing events. Most applications store data in AWS data stores like Amazon Simple Storage Service (S3) or Amazon DynamoDB, which can automatically emit events whenever an update takes place, without any application code.

EventBridge Pipes: Events without application code

EventBridge Pipes allows you to create point-to-point integrations between event producers and consumers with optional transformation, filtering, and enrichment steps. Serverless integration services combined with cloud automation allow ”point-to-point” integrations to be more easily managed than in the past, which makes them a great fit for this use case.

This example takes advantage of EventBridge Pipes’ ability to fetch events actively from sources like DynamoDB Streams. DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours. EventBridge Pipes picks up events from that log and pushes them to one of over 14 event targets, including an EventBridge bus, SNS, SQS, or API Destinations. It also accommodates batch sizes, timeouts, and rate limiting where needed.

EventBridge Pipes example

The integration through EventBridge Pipes can replace the custom application code that sends the event, including any retry or error logic. Only the following code remains:

const order = { "orderId": "1234", "Items": ["One", "Two", "Three"] }
await writeToDb(order);

Automation as code

EventBridge Pipes can be configured from the CLI, the AWS Management Console, or from automation code using AWS CloudFormation or AWS CDK. By using AWS CDK, you can use the same programming language that you use to write your application logic to also write your automation code.

For example, the following CDK snippet configures an EventBridge Pipe to read events from a DynamoDB Stream attached to the Orders table and passes them to an EventBridge event bus.

This code references the DynamoDB table via the ordersTable variable that would be set when the table is created:

const pipe = new CfnPipe(this, 'pipe', {
  roleArn: pipeRole.roleArn,
  source: ordersTable.tableStreamArn!,
  sourceParameters: {
    dynamoDbStreamParameters: {
      startingPosition: 'LATEST'
    },
  },
  target: ordersEventBus.eventBusArn,
  targetParameters: {
    eventBridgeEventBusParameters: {
      detailType: 'order-details',
      source: 'my-source',
    },
  },
}); 

The automation code cleanly defines the dependency between the DynamoDB table and the event destination, independent of application logic.

Decoupling with data transformation

Coupling is not limited to event sources and destinations. A source’s data format can determine the event format and require downstream changes in case the data format or the data source change. EventBridge Pipes can also alleviate that consideration.

Events emitted from the DynamoDB Stream use the native, marshaled DynamoDB format that includes type information, such as an “S” for strings or “L” for lists.

For example, the order event in the DynamoDB stream from this example looks as follows. Some fields are omitted for readability:

{
  "detail": {
    "eventID": "be1234f372dd12552323a2a3362f6bd2",
    "eventName": "INSERT",
    "eventSource": "aws:dynamodb",
    "dynamodb": {
      "Keys": { "orderId": { "S": "ABCDE" } },
      "NewImage": { 
        "orderId": { "S": "ABCDE" },
        "items": {
            "L": [ { "S": "One" }, { "S": "Two" }, { "S": "Three" } ]
        }
      }
    }
  }
} 

This format is not well suited for downstream processing because it would unnecessarily couple event consumers to the fact that this event originated from a DynamoDB Stream. EventBridge Pipes can convert this event into a more easily consumable format. The transformation is specified via an inputTemplate parameter using JSONPath expressions. EventBridge Pipes added support for list processing with wildcards proves to be perfect for this scenario.

In this example, add the following transformation template inside the target parameters to the preceding CDK code (the asterisk character matches a complete list of elements):

targetParameters: {
  inputTemplate: '{"orderId": <$.dynamodb.NewImage.orderId.S>,' + 
                 '"items": <$.dynamodb.NewImage.items.L[*].S>}'
}

This transformation formats the event published by EventBridge Pipes like a regular business event, decoupling any event consumer from the fact that it originated from a DynamoDB table:

{
  "time": "2023-06-01T06:18:10Z",
  "detail": {
    "orderId": "ABCDE",
    "items": ["One", "Two", "Three" ]
  }
}

Conclusion

When building event-driven applications, consider whether you can replace application code with serverless integration services to improve the resilience of your application and provide a clean separation between application logic and system dependencies.

EventBridge Pipes can be a helpful feature in these situations, for example to capture and publish events based on DynamoDB table updates.

Learn more about EventBridge Pipes at https://thinkwithwp.com/eventbridge/pipes/ and discover additional ways to reduce serverless application code at https://serverlessland.com/refactoring-serverless. For a complete code example, see https://github.com/aws-samples/aws-refactoring-to-serverless.