Egress Rate Controlling in Distributed Systems (Part 2)


In the precedent part of this series, some scenarios of the producer’s impact on downstream service were explored. This part will focus on some patterns and practices in asynchronous design to reduce the impact on downstream service.
This part focus on Asynchronous design and related patterns and practices
Real Scenarios
Before starting patterns and practices, let’s look at a real scenario where the producer rate could impact downstream services.
Migrating Provisioned DDB to OnDemand
Recently, AWS reduced the cost of DynamoDb onDemand by 50%. This announcement led to many migration decisions. When we moved to On-Demand mode for one of our critical and core services, we recognized a higher processing latency and error rate in one of downstream services. While the overall service’s functionality was not impacted ( an old serverless system ), the delays and latency introduced by a higher rate of errors had a real business impact.
The above diagram demonstrates a typical design in many companies and works perfectly at some scale. Still, it can be dangerous when the processing rate exceeds the predicted rate. This design introduced significant OpenSearch queries’ latencies by migrating Dynamodb table to OnDemand.
Some metrics during investigations showed a higher rate of successful write requests compared to the provisioned mode before migration. In Provisioned mode, requests were throttled but succeeded after retries and an exponential backoff strategy for errors allowed balancing the instant peak to a wider time window by delaying it.
Fortunately ( or unfortunately ), experimenting with throttling and retrying for UpdateB allowed a decrease in the processing rate at a given time, helping downstream to continue processing or by giving downstream the chance of recovery. Migrating to OnDemand gave more capacity at initial migration to the producer (4 partitions 12000 RCU, 4000 WCU).
Too many changes increased the load on the downstream service, which in turn led to degraded performance of the downstream service using OpenSearch, the impact was due to the increased number of queries in OS internal buffer and, consequently, increased latency. These latencies lead to increased error rates for Lambda triggers. While not a real-world issue, using Bisect and a large batch size does increase the load on the downstream side.
Design Problems
Distributing many changes at the same time
Introducing the database changes instead of Concrete events representing the final and viable state at a given time (processing time)
Introducing Coupling between two specific services DDB stream and OpenSearch
Propagating Errors
Big BatchSize with Bisect On Failure
High-Level Async Design
In a Timely-Balanced scenario, the Count of instantaneous occurrences and Speed are reduced. Concurrency can consequently be improved but needs to be controlled at a layer based on possibilities.
In an Instantaneous scenario, the Count, speed, and concurrency are increased, which leads to more capacity needed on the downstream side.
While at some scale, Time-Balanced and Instantaneous scenarios produce some close functional results, at a larger scale, more challenges can be considered and refined intentionally. The following high-level diagram can better represent the challenge.
In Part 1 of this series related to Rate Controlling, four attributes were explored: Time Window, Concurrency, Processing Speed, and Count, but another important attribute, State was also under focus.
Producer & Consumer Integrations
While AWS provides many services that allow and simplify integrations in an Event-drive design, each service is purpose-built and has dedicated pros and cons. The following diagram showcases some available integrations. (The Lambdas Presented in this diagram are just to demonstrate a computing resource, but this can be a Fargate container or any other service with computing/processing capability)
The “Invocation Models and Service Integrations” topic is deeply explained in Chapter 3 of Mastering Serverless Computing with AWS Lambda
In above diagram, three category of integrations are discussed:
Streams
Pub/Sub
Queues
Streams
Streams such as DynamoDb streams or kinesis are ideal for near realtime data streaming even in high throughput scenarios with massive volumes of data. They guarantee ordering, provide means to manage highest Write throughput requirements, and support handling large amount of data.
Pub/Sub
Services providing Publ/Sub such as Event-Brdige or SNS have the capacity to distribute any single messages to 1 or many subscribers near realtime and offer high throughput allowing to manages high number of distinct messages, per design they operate in an fully asynchronous way which means they dont track the consumer’s processing status.
Queues
Queue systems such as SQS guarantee delivery and provide mechanisms for resiliency, load leveling, and decoupling. They provide simplified means to prevent message loss such as keeping messages in the queue till expirations and redrive policies.
General Recommendations
This section provides general guideline to better control the rate and load on downstreams and not focusing on design practices but adopting some level of practices to develop some safeguards.
Batching allows to reduce the rate of statelessness by providing related changes side by side that let the software code have more context to take decisions. Reducing the concurrency can be achieved in different way for each category but at the end a strict concurrency configuration such as ConcurrencyLimit with SQS, ReservedConcurrency with SNS/EB , or Partitioning with Streams allows reducing the execution rate and consequently the risk of overloading the downstream. Synthesizing messages allows being able to produce the final state at that given time by looking at a time ordered batch of related records.
Source Code
SQS integration
Describing how Amazon SQS works internally is out of scope, but at a high level the polling process of AWS lambda with SQS can be illustrated by following diagram
Aws lambda Asks for Messages and SQS returns available messages from sample nodes, and this is the SQS default behavior, By configuring a longer wait time than zero (default value for short polling), long polling will be applied and SQS will do a best effort to gather more messages from all nodes.
Batching: if a BatchSize specified this will improve the chance of proper batching, allowing more messages to be grouped and allows lambda to have more knowledge about state, while this state is yet distributed and not 100% localized, it will be useful and improve processing quality at some level. Using maxConcurrency
option alongside batching allows to have more localized state by increasing the chance of having messages in the same batch. This can be configured using CDK as below.
const queue = new Queue(this, 'Queue', {
receiveMessageWaitTime: Duration.seconds(20),
deadLetterQueue: {
maxReceiveCount: 3,
queue: new Queue(this, 'DeadLetterQueue')
}
});
lambdaFunction.addEventSource(new SqsEventSource(queue, {
batchSize: 10,
maxBatchingWindow: Duration.seconds(60),
maxConcurrency: 2,
}));
But Lambda Event Source Mapping provides another configuration option maxBatchingWindow
that can be used allowing the lambda service to defer invocation up to 5 minutes while asking for more and more messages form SQS and a function is invoked when one of the following conditions is met as payload size
reaches 6MB, Max Batching Window
reaches its maximum value, or the Batch Size
reaches its maximum value.
The lambda now has more context and can apply more control and synthesize messages. The following snippet shows how in typescript this can be achieved.
const records = (event.Records as SQSRecord[])
.sort((a, b) => { return a.attributes.ApproximateFirstReceiveTimestamp.localeCompare(b.attributes.ApproximateFirstReceiveTimestamp); })
.map((record) => { return { ...record, body: JSON.parse(record.body) }});
const grouped = Object.groupBy(records, (currentValue: recordType) => currentValue.body.subject);
const results: SQSRecord[] = [];
grouped.forEach((entry) => {
const entryValues = entry?.[1] as any[];
let firstEvent = entryValues?.splice(0,1)?.[0];
const initialType = firstEvent.body.type;
entryValues.forEach((current: any) => {
const currentData = current.body.data;
const currentType = current.body.type;
if(initialType == 'user.signedup') {
if(currentType == 'user.profile_updated') {
firstEvent.body.data.profile = {
...firstEvent.body.data.profile,
age: currentData.age,
nickname : currentData.nickname,
prefered_channels: currentData.prefered_channels
};
}
else if(currentType == 'user.unsubscribed') {
firstEvent = null;
}
}
});
results.push(firstEvent);
});
Sending two messages related to the same id will group them and by synthesizing will avoid distributing or doing unnecessary processes. The following json example represents two distinct events’ payload sent to SQS.
{
"id": "dmPvPSlwFZOTOjefpu8m2",
"time": "2025-03-23T17:13:32+00:00",
"source": "user.management",
"subject": "omid_unique_user_id",
"data": {
"name": "omid",
"profile": {
"age": 40
}
},
"type": "user.signedup"
},
{
"id": "TAF5aEQeAh7GX1q8Cvam3",
"time": "2025-03-23T17:14:32+00:00",
"source": "user.management",
"subject": "omid_unique_user_id",
"data": {
"name": "omid",
"age": 43,
"nickname": "xaaxaax",
"prefered_channels": [ "EMAIL" ]
},
"type": "user.profile_updated"
}
The Example will distribute a final and single user.signedup
event representing a combination of both events.
{
"id": "dmPvPSlwFZOTOjefpu8m2",
"time": "2025-03-23T17:13:32+00:00",
"source": "user.management",
"subject": "omid_unique_user_id",
"data": {
"name": "omid",
"profile": {
"age": 43,
"nickname": "xaaxaax",
"prefered_channels": [ "EMAIL" ]
}
},
"type": "user.signedup"
}
While the above example provide some details to better control the rate and reducing the risk of overloading downstreams but this approach has no guarantee of control on top of single entities even if in above example the goal was achieved, this means it is possible to have two above messages in two distinct invocations while each invocation receives the respective batch but random entities.
Partitioning: While Batching can be useful but at some points two related messages can be presented in two distinct invocation ( execution environment ) even they are close in terms of time, Grouping related entities and guarantee that they will reach the same invocation (exp. user.signedup
and user.profile_updated
events related to same user). This can be configured using CDK as below.
const queue = new Queue(this, 'Queue', {
receiveMessageWaitTime: Duration.seconds(20),
fifo: true,
deadLetterQueue: {
maxReceiveCount: 3,
queue: new Queue(this, 'DeadLetterQueue', { fifo: true })
}
});
lambdaFunction.addEventSource(new SqsEventSource(queue, {
batchSize: 10,
}));
Sending the previous message payloads presented in batching section, following SQS messages requests must be send to SQS, the presence of MessageGroupId
guarantees that at a given time all messages for the same user ( MessageGroupId ) will be strictly received together.
{
"Id": "Test-0001-2015-09-16T140731Z",
"MessageGroupId": "omid_user_id",
"MessageDeduplicationId": "dmPvPSlwFZOTOjefpu8m2",
"MessageBody": "{\r\n \"id\":\"dmPvPSlwFZOTOjefpu8m2\",\r\n \"subject\":\"omid_user_id\",\r\n \"time\":\"2025-03-23T17:13:32+00:00\",\r\n \"data\":{\r\n \"name\":\"omid\",\r\n \"profile\":{\r\n \"age\":34,\r\n \"nickname\":\"omid\"\r\n }\r\n },\r\n \"type\":\"user.signedup\"\r\n }"
},
{
"Id": "Test-0002-2015-09-16T140930Z",
"MessageGroupId": "omid_user_id",
"MessageDeduplicationId": "2dokCjxmQvUk2brjLuus1",
"MessageBody": "{\r\n \"id\":\"2dokCjxmQvUk2brjLuus1\",\r\n \"subject\":\"omid_user_id\",\r\n \"time\":\"2025-03-23T17:14:32+00:00\",\r\n \"data\":{\r\n \"name\":\"omid\",\r\n \"age\":43,\r\n \"nickname\":\"xaaxaax\",\r\n \"prefered_channels\": [ \"EMAIL\" ]},\r\n \"type\":\"user.profile_updated\"\r\n }"
}
This example apply a mix of Partitioning and Batching allowing to give more change to lambda have more consistent state which is managed by SQS internally.
DynamoDb / Kinesis Stream Integration
Before deep diving into Streams integrations, this section elaborate on how the integration works. The following diagram illustrates how Items, Item Collections, and stream records are organized.
Aws lambda Asks for List of shards ( aka. Partition in Dynamodb ), Now Lambda handle each shard process in isolation, and starts by getting the Shard Iterator ( aka. start pointer in streaming concept ), giving the iterator the stream returns the items in response. Knowing that each item collection ( items related to same PartitionKey ) is always located in a single Shard the lambda already benefit from having all related data as by default a single lambda execution is dedicated to each shard at a given time which means at least for a single PartitionKey there is a single running invocation.
sequenceDiagram
participant A as Event Source Mapping
participant B as Stream
participant C as Shard
A->>B: Ask for list of Shards
par Shard 1
A->>C: ask for Shard Iterator
A->>C: Get Records in a Shard using Iterator
C->>A: Respond back batch of messages
and Shard 2
A->>C: ask for Shard Iterator
A->>C: Get Records in a Shard using Iterator
C->>A: Respond back batch of messages
end
Using AWS CDK, the integration can be refined to give more flexibility to make concrete decisions. The following snippet shows how the event source mapping can be configured.
lambdaFunction.addEventSource(new DynamoEventSource(table, {
startingPosition: StartingPosition.LATEST,
batchSize: 10,
maxBatchingWindow: Duration.seconds(60),
bisectBatchOnError: true,
retryAttempts: 3,
maxRecordAge: Duration.minutes(15),
parallelizationFactor: 1,
}));
A batch of 10 records and a 60 seconds batching window is configured, allowing reception of more related records that consequently will allow to reduce the overload on downstreams.
Sending a new item to DynamoDb table and updating it
// New Item
{
"PutRequest": {
"Item": {
"pk": {"S": "omid"},
"sk": {"S": "t0000"},
"nickname": {"S": "Omid"},
"age": {"N": "30"},
"prefered_channels": {"S": "EMAIL"},
"status": {"S": "ACTIVE"},
}
}
}
// Update Item
{
"PutRequest": {
"Item": {
"pk": {"S": "omid"},
"sk": {"S": "t0000"},
"nickname": {"S": "XaaXaaX"},
"age": {"N": "43"},
"prefered_channels": {"S": "EMAIL"},
"status": {"S": "MDIFIED"},
}
}
}
Looking at handler code the grouping is done per PK and verification based on eventName being INSERT
, MODIFY
, or REMOVE
.
const grouped = Object.entries(
Object.groupBy(records, (currentValue: DynamoDBRecord) => currentValue.dynamodb?.Keys?.pk.S || '')
);
const results: DynamoDbInternalRecord[] = [];
grouped.forEach((entry) => {
let key = entry?.[0];
let entryValues = entry?.[1] as DynamoDBRecord[];
let firstEvent = entryValues?.splice(0,1)?.[0];
const initialType = firstEvent.eventName;
let data: Record<string, any> = {};
if(initialType == 'INSERT' || initialType == 'MODIFY')
data = unmarshall(result.dynamodb?.NewImage as DynamoDbInternalRecord);
else if(initialType == 'REMOVE')
data = unmarshall(result.dynamodb?.OldImage as DynamoDbInternalRecord);
entryValues.forEach((current: any) => {
const currentType = current.eventName;
let currentData: Record<string, any> = {};
if(initialType == 'INSERT' || initialType == 'MODIFY')
currentData = unmarshall(current.dynamodb?.NewImage as DynamoDbInternalRecord);
else if(initialType == 'REMOVE')
currentData = unmarshall(current.dynamodb?.OldImage as DynamoDbInternalRecord);
if(initialType == 'INSERT') {
if(currentType == 'MODIFY') {
data.age = currentData.age;
data.nickname = currentData.nickname;
data.prefered_channels = currentData.prefered_channels;
}
else data = {};
}
});
results.push(data);
});
The printed result for the above records will be as below
{
"pk": "omid",
"sk": "t0000",
"nickname": "XaaXaaX",
"age": 43,
"prefered_channels": "EMAIL",
"status": "ACTIVE"
}
EventBridge / Sns Integration
SNS and EventBridge services will allow a Fanout design with multiple consumers (subscribers). SNS/EB, even though the event contract for SNS contains an array of records ( Records[] ), the lambda always receive a single message in the array ( ref in FAQ: here ). There is no possibility of refined batch processing and custom batching configuration that derives a higher level of concurrent execution of lambdas and a load without the possibility of control. The distribution is an asynchronous call, This means the Channel will not be aware of processing results in consumer side.
Reserved Concurrency: In cases of direct integration with SNS or Eventbridge, the solution to limit downstream impact is to apply reserved concurrency.
OnFailure Destination: Integrate a DLQ. The default behavior is 3 retries with default intervals between each invocation retry (unchangeable):
1st Retry: 1 minute
2nd: 2 minutes
Load Leveling: Use an SQS as an intermediate layer to facilitate load balancing.
const lambdaFunction = new NodejsFunction(this, 'LambdaZipFunction', {
reservedConcurrentExecutions: 1,
...
});
const topic = new Topic(this, 'Topic');
lambdaFunction.addEventSource(new SnsEventSource(topic, {
deadLetterQueue: new Queue(this, 'DeadLetterQueue'),
}));
Conclusion
Driving design decisions often lies on top of scalability and High-Throughput, While this is an ideal approach, this can become tricky and will impact the business process when boundaries are not well defined and communications are not well thought ( Which is often the case ). Keeping track of surroundings of a service while making such decisions is a must to have and will save time and effort in long term. While crossing out-of-control boundaries ( external system, partners, Saas, etc. ) is not a major case but when exists will cost companies effort, downtime and disruption.
Asynchronous designs give more power and control at communication boundaries but they come with some process oriented decisions by reducing concurrency as a safeguard but also speed for the sake of having more on the fly State consistency.
This part of series explored some details about typical async services and some integration details to control the load on downstream services. The next part of this series will focus on patterns and practices related to rate control in synchronous scenarios where the final product is highly coupled to an external service or downstream with limitations and constraints.
Subscribe to my newsletter
Read articles from Omid Eidivandi directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
