Kafka Producer - C# Sync vs Async
Rules of Thumb
We're taught early and often that any I/O Bound operations should be done Asynchronously. A general rule of thumb is that anything that takes more than 50ms should be done async. While this is quite true in most cases, sometimes, just seeing an option to run a method async, we default to that implementation without much forethought.
Confluent Kafka Producer
Confluent offers a very nice Apache Kafka integration NuGet package Confluent.Kafka. Within this package you'll find nearly all the bits and pieces you'll need to connect with, consume and produce via Kafka. In particular, we'll look at the behavior of the default implementation of IProducer<TKey, TValue>
To first set the stage we'll pull in some Protobuf tooling via Google.Protobuf and Grpc.Tools NuGet packages. With those, we'll create a message as an instance of Message<TKey, TValue>
with the following schema.
syntax = "proto3";
package ConcurrentFlows.KafkaProducer;
message WidgetEvent {
string Greeting = 1;
}
Benchmark - Environment
We'll need a local Kafka cluster to run our benchmarks against and we can docker-compose.yml one up relatively easily via docker-compose up -d
.
In addition to our cluster, we'll initialize two topics via topic-init.sh. One for a Sync Producer and one for an Async Producer.
Within our cluster, we've also stood up a Schema Registry and we'll use the associated Confluent NuGet package Confluent.SchemaRegistry.Serdes.Protobuf This package brings in the necessary bits and pieces to talk to the Schema Registry and serialize/deserialize, (Ser/Des), our Protobuf messages.
Finally, to easily create messages we'll leverage the Bogus NuGet package and create an instance of its Faker; Faker<WidgetEvent> = new();
.
Benchmark - Setup
The complete benchmark setup can be found within ProduceBenchmarks.cs
To benchmark both Async and Sync Produce...
methods we'll use BenchmarkDotNet and define a IProducer<TKey, TValue>
for both Sync & Async. Each Producer will share the same config for both itself and its Registry.
Note: The default Acks setting is Acks = ALL. This means our Producers will wait for acknowledgment from all three replicas
First, the config
var producerConfig = new ProducerConfig()
{
BootstrapServers = "localhost:9092,localhost:9093,localhost:9094",
QueueBufferingMaxMessages = 500_000
};
var registryConfig = new SchemaRegistryConfig()
{
Url = "localhost:8081",
};
Next, the Schema Registry
var registryClient = new CachedSchemaRegistryClient(registryConfig);
Then, our Sync Producer
syncProducer = new ProducerBuilder<string, WidgetEvent>(producerConfig)
.SetValueSerializer(
new ProtobufSerializer<WidgetEvent>(registryClient)
.AsSyncOverAsync())
.SetErrorHandler((p, e)
=> Console.WriteLine(
errorMessage, e.Code, e.Reason))
.Build();
Finally, our Async Producer
asyncProducer = new ProducerBuilder<string, WidgetEvent>(producerConfig)
.SetValueSerializer(
new ProtobufSerializer<WidgetEvent>(registryClient))
.SetErrorHandler((p, e)
=> Console.WriteLine(
errorMessage, e.Code, e.Reason))
.Build();
Benchmarks - Methods
We'll measure the performance of two methods of interest
Produce("topic", TMessage, deliveryHandler)
ProduceAsync("topic", TMessage)
[Benchmark]
public void KafkaProducerSync()
{
var msg = new Message<string, WidgetEvent>()
{
Key = $"{Guid.NewGuid()}",
Value = faker.Generate()
};
syncProducer.Produce(sync_topic, msg,
d =>
{
if (d.Error.IsError)
throw new InvalidOperationException(
$"{d.Error.Code}:{d.Error.Reason}");
});
}
[Benchmark]
public async Task KafkaProducerAsync()
{
var msg = new Message<string, WidgetEvent>()
{
Key = $"{Guid.NewGuid()}",
Value = faker.Generate()
};
await asyncProducer.ProduceAsync(async_topic, msg);
}
Benchmarks - Results
Running the benchmarks is as simple as kicking off our console project in Release
using BenchmarkDotNet.Running;
using ConcurrentFlows.KafkaProducer1;
Console.WriteLine("Starting Producer Benchmarks");
BenchmarkRunner.Run<ProducerBenchmarks>();
On the same platform, with the same previously stood-up cluster
Cluster -
Network concurrentflows - Created
Container zookeeper - Started
Container broker3 - Started
Container broker1 - Started
Container broker2 - Started
Container schema-registry - Started
Container rest-proxy - Started
Container topic-init - Started
Platform -
BenchmarkDotNet=v0.13.4, OS=Windows 11 (10.0.22621.1105)
12th Gen Intel Core i9-12900HK, 1 CPU, 20 logical and 14 physical cores
.NET SDK=7.0.100
[Host] : .NET 7.0.0 (7.0.22.51805), X64 RyuJIT AVX2
DefaultJob : .NET 7.0.0 (7.0.22.51805), X64 RyuJIT AVX2
We can see a significant difference between Produce()
and ProduceAsync()
Method | Mean | Error | StdDev | Gen0 | Gen1 | Allocated |
KafkaProducerSync | 2.381 µs | 0.0468 µs | 0.0415 µs | 0.5112 | 0.0076 | 6.24 KB |
KafkaProducerAsync | 15,291.646 µs | 87.8239 µs | 68.5671 µs | - | - | 6.74 KB |
Key Takeaway -
Produce()
is ~6,000 times faster thanProduceAsync()
!!!
Why such a difference???
To begin with, note we had set QueueBufferingMaxMessages
to 500,000. We did this because messages will stack up in the underlying Producer buffer as they work their way out to all replicas. Essentially, Produce()
is "Producing" faster than we're "Delivering".
The big highlight is that -
Both
Produce()
andProduceAsync()
are Asynchronous 🤓
ProduceAsync wraps the entire operation of "Producing" a message into a dotnet friendly
Task<DeliveryResult<TKey, TValue>>
Produce leverages a callback style future defined by the third optional parameter
deliveryHandler
This async style difference lets us decouple "Producing" from "Delivering" by deferring the handling of our DeliveryReport<TKey, TValue>
. This is as opposed to waiting for a DeliveryResult<TKey, TValue>>
to be fully formed.
Wrap Up
Now don't go change all your code to Produce()
, it's still an unnecessary blocking call, albeit with minimal impact, even when your cluster is not localhost
. What's important is to understand how these methods leverage asynchronicity in different ways and the way this impacts: Acknowledgement, Error Management, etc.
Ideally, I'd propose an entire separation of concerns of hot path message production and Producing/Delivering the message to the wire. That may be the subject of a future post. 🙃
Finally, find all relevant code here
ConcurrentFlows.HashNode/ConcurrentFlows.KafkaProducer1
Subscribe to my newsletter
Read articles from Joshua Steward directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by