EventBridge Pipes in Practice
Amazon released EventBridge Pipes in December of 2022. It is a fantastic tool that attempts to simplify gluing pieces of a cloud infrastructure together in a serverless way. Where you previously needed a glue lambda and queues, you may be able to use Pipes instead.
Components of a Pipe
Pipes, fundamentally, are very simple: they are a service that connects a source to a destination, allowing you to optionally filter and enrich the data as it flows through. It's built in a way that lets you compose items together easily, and add custom code where you need to. The real nice bit here is how many services it supports, from Kinesis Sources to destinations like API Gateway, EventBridge, and many things in between, Pipes can really simplify interconnections. New service are being added, so for an update list or sources and targets, see the AWS Documentation site.
Example Scenario
Let's take the following example: We have an existing Events API that collects event data from many external sources (web apps, mobile apps, etc). The API pushes to a Kinesis Stream to be handled by a series of backend processes. We want to tap that stream and push the events in it to an internal Event Bridge without disrupting the existing architecture so that we can generate metrics and further react to events dynamically.
Other Options
If we were able to change everything or build a new system from scratch, we could just leverage API Gateway's native capability to push directly to EventBridge (this is a great pattern, btw). However that is a lot of change, and in this scenario, we aren't allowed to disrupt things to that degree.
Before Pipes, you would likely attach Lambda to the existing Kinesis Stream and have it push messages to the target EventBridge Bus. This is a completely valid approach, but does have some complexities involved.
Enter AWS Pipes
Instead, let's use pipes to tap that stream and get our events to EventBridge completely serverlessly. And since we like IAC (and a challenge), let's do it in CDK with Typescript... Why? Well, that's what I use at work. Let me preface all of this by saying I'm relatively new to Typescript, so please let me know where I can improve the code.
CDK makes building infrastructure in AWS much simpler. There are many built in constructs that make common patterns available with a simple instantiation, and it enforces best practices like least-privilege out of the box. Check out the API Docs for more if you're not familiar. Unfortunately, Pipes is so new, that the support in CDK is limited... As of this writing, there are Alpha Packages (enrichments, sources, targets) that have the basics in place, and we can build the missing pieces on top of it.
Custom CDK Pipe Source
Lets start where all things begin; the source. There is an example SQS Source, and the documentation mentions how to create a source implementation, so let's dive in...
import * as cdk from "aws-cdk-lib";
/**
* A Kinesis source for EventBridge Pipes
*
* @param stream The Kinesis stream to use as the source
* @param kinesisStreamParameters The parameters for the Kinesis stream. Starting Position defaults to LATEST
*/
class KinesisSource implements pipes.ISource {
sourceArn: string;
sourceParameters: pipes.SourceParameters = {
kinesisStreamParameters: {
startingPosition: "LATEST", // I like sane defaults
},
};
constructor(
private readonly stream: kinesis.IStream,
kinesisStreamParameters?: cdk.aws_pipes.CfnPipe.PipeSourceKinesisStreamParametersProperty
) {
this.stream = stream;
this.sourceArn = stream.streamArn;
if (kinesisStreamParameters) {
this.sourceParameters = { kinesisStreamParameters };
}
}
// eslint-disable-next-line
bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}
grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.stream.grantRead(pipeRole);
}
}
This source uses the existing CfnPipe primitives to handle the properties, which makes the implementation relatively easy. There is only one required parameter, and that is the Starting Position. I assumed LATEST
as a sane default, but this object allows us to override as needed.
Custom CDK Pipe Target
Just like the Source, we need a target.
import * as cdk from "aws-cdk-lib";
/**
* An EventBridge target for EventBridge Pipes
*
* @param targetEventBridge The EventBridge event bus to use as the target
* @param inputTransformation The input transformation to apply to the event
*/
class EventBridgeTarget implements pipes.ITarget {
targetArn: string;
private inputTransformation: pipes.InputTransformation | undefined;
private targetParameters: cdk.aws_pipes.CfnPipe.PipeTargetEventBridgeEventBusParametersProperty | undefined;
constructor(
private readonly targetEventBridge: events.IEventBus,
props: {
inputTransformation?: pipes.InputTransformation;
targetConfig?: CfnPipe.PipeTargetEventBridgeEventBusParametersProperty;
} = {}
) {
this.targetEventBridge = targetEventBridge;
this.targetArn = targetEventBridge.eventBusArn;
this.inputTransformation = props?.inputTransformation;
this.targetParameters = props?.targetConfig;
}
bind(_pipe: pipes.Pipe): pipes.TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
eventBridgeEventBusParameters: this.targetParameters,
},
};
}
grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.targetEventBridge.grantPutEventsTo(pipeRole);
}
}
Again, we're leveraging the primitives from the underlying CDK alpha to generate our target. Targets support more options, so I handled them a little differently here. Note the bind function, where I return a targetParameters object. That is what corresponds to the CloudFormation target parameters object which is complex and is used by all pipe targets. I'm sure there's a nicer abstraction that could be used here, but I was more focused on getting it up and running that writing the next alpha target.
Using Our New Constructs
Using the above constructs, we are able to use the existing pipe from the alpha library to get a working pipe!
// EventsMetricsBus is ppublic on the construct so that other stacks
// can reference it for adding rules easily.
this.EventsMetricsBus = new events.EventBus(this, `${prefix}-eventBus`, {
eventBusName: `${prefix}-eventBus`,
});
const sourceEventStream = kinesis.Stream.fromStreamArn(this, `${prefix}-sourceEventStream`, kinesisStreamArn);
new eventsSchemas.CfnDiscoverer(this, `${prefix}-discoverer`, {
sourceArn: this.EventsMetricsBus.eventBusArn,
description: "Discoverer for event metrics bus",
});
const base64InputTransformer = pipes.InputTransformation.fromText(
'{ "kinesisWrappedEvent": <$.data>, "originalEvent" : <aws.pipes.event> }'
);
const kinesisToEventBridgePipe = new pipes.Pipe(this, `${prefix}-kinesisToEventBridgePipe`, {
pipeName: `${prefix}-kinesisToEventBridgePipe`,
source: new KinesisSource(sourceEventStream),
target: new EventBridgeTarget(this.EventsMetricsBus, {
inputTransformation: base64InputTransformer,
targetConfig: {
source: sourceName,
detailType: "$.partitionKey", // This will always be here as it is the partition key of the Kinesis record itself
},
}),
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],
logLevel: pipes.LogLevel.ERROR,
});
Some notes on the above code...
Killer Feature: The Input Transformer uses a feature of Pipes called Implicit Body Data Parsing. The Kinesis event data is base64 encoded. Pipes will automatically decode it and make it available. This means you can set your input for the EventBridge Bus to the decoded data, allowing for simple filtering. No enrichment required!
- This would let us set the input to pipes to just be
<$.data>
and it would essentially keep our data the same as it was when it came into the API, completely hiding the plumbing of kinesis. I really like this for downstream developer clarity.
- This would let us set the input to pipes to just be
I like to use
${prefix}
in my CDK stacks. It allows for standard naming of resources throughout the stack.Using the source is easy. I pass in a kinesis stream from another stack and use it as the source of our KinesisSource.
Killer Feature: The target config supports the same JSON path syntax as the Input Transformer! So we are able to take anything from the object and use it as the detail-type! See Dynamic Path Parameters in the docs. In this instance, our API Gateway sets the partition key to the event type, so our downstream events are now the same as the event name coming in to the API.
Conclusion
With these pieces in place, we have reached our goal! We can now react to our API Gateway events dynamically in our new EventBridge Bus. In our instance, we wanted to generate metrics on all the events in our system. This approach lets us do so cleanly, and without touching our existing process. As a bonus, we also got schema discovery of our events, which helped our team stay in sync to changes over time to our inbound events.
Thanks
Many thanks to my seniors who encoruaged me and provided guidance through this project (Alex, Brandon, and Matt, you guys are the best)!
Subscribe to my newsletter
Read articles from Derek Murawsky directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Derek Murawsky
Derek Murawsky
I'm a jack of all trades with deep expertise in infrastructure, cloud, networking, and devsecops. In my spare time I also like to play around with self-hosting, embedded devices, camping, permaculture, sailing, and scouting