AsyncMediator Series - Part 2
Welcome Back! 👋
So glad you made it back for Part 2, in Part 1: Broadcast Messaging - In Memory we created the implementation of something called Flow. Which is simply the broadcast of messages across many Consumers from many Originators leveraging TPL Dataflow internally.
Towards our AsyncMediator
Our next step is to introduce the concept of Message Completion. This means the Consumer indicates whether it has Completed
processing or Failed
to process the received message. We want to maintain the broadcast capabilities of the Flow and that feature will drive us away from our TPL Dataflow implementation.
What we'll gain is the ability to dynamically on board Consumers and have a greater fine-grained control of all Consumers. Additionally, this will permit the ability to support Queries
and not limit us to Commands
Building a new flow
First, let's lay out some requirments of our new implemtntation:
- Must allow many Originators
- Must allow many Consumers
- Must allow Sinks to detach from Source
- Must only Complete an
Envelope
when all Consumers finish
Our Library Api
We want this new flow to be easy and simple to consume. Ideally we should have a simple registration and well known interfaces to consume. To start with our registration should look something like this, along with a sample Message.
public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
where TPayload : notnull
public sealed record Message(int Id);
We intend to provide consumers of the library a simple interface to interact with. For example, the Originator of Messages would look something like this:
public sealed class Originator
{
private readonly IChannelSource<Message> source;
public Originator(IChannelSource<Message> source)
=> this.source = source;
public async ValueTask ProduceManyAsync(int count, CancellationToken cancelToken)
{
var messages = Enumerable.Range(0, count)
.Select(i => new Message(i).ToEnvelope());
var sending = messages.Select(async msg => await source.SendAsync(msg));
await Task.WhenAll(sending);
}
}
And, a simple Consumer of Messages would look like this:
public sealed class Consumer
{
private readonly IChannelSink<Message> sink;
public Consumer(IChannelSink<Message> sink)
=> this.sink = sink;
public async Task<IEnumerable<Message>> CollectAllAsync(CancellationToken cancelToken)
{
var set = new List<Message>();
try
{
await foreach (var envelope in sink.ConsumeAsync(cancelToken))
set.Add(envelope.Payload);
}
catch (OperationCanceledException)
{ /*We're Done*/ }
return set;
}
}
Guiding Test
All that we need to expose is the simple diametrical interfaces; Sink
& Source
. With these and our requirements in mind, we can write a test and let that drive and affirm our implementation.
[Fact]
public async Task Source_BroadcastsTo_AllConsumers()
{
var provider = new ServiceCollection()
.AddMsgChannel<Message>()
.AddTransient<Originator>()
.AddTransient<Consumer>()
.BuildServiceProvider();
var originator1 = provider.GetRequiredService<Originator>();
var originator2 = provider.GetRequiredService<Originator>();
var consumer1 = provider.GetRequiredService<Consumer>();
var consumer2 = provider.GetRequiredService<Consumer>();
originator1.Should().NotBe(originator2);
consumer1.Should().NotBe(consumer2);
var count = 100;
using var cts = new CancellationTokenSource();
var consume1 = consumer1.CollectAllAsync(cts.Token);
var consume2 = consumer2.CollectAllAsync(cts.Token);
var sending1 = originator1.ProduceManyAsync(count, cts.Token);
var sending2 = originator2.ProduceManyAsync(count, cts.Token);
await Task.WhenAll(sending1, sending2);
cts.Cancel();
var set1 = await consume1;
var set2 = await consume2;
set1.Should().HaveCount(count * 2);
set2.Should().HaveCount(count * 2);
set1.Should().BeEquivalentTo(set2);
}
Our test sets up the Dependency Registration, then creates two of each Producer and Consumers. We assert that each instance is unique and then kick off our action. We trigger the Consumers to start listening and trigger the Producers to start sending a preset count of Messages. Once our Producers complete, we trigger the CancellationToken
to stop our Consumers. After awaiting the consumption Tasks
we can assert that each Consumer has received all Messages, double the original count, since we have two Producers. Finally, we assert that both sets of Messages are equal, indicating that we did multicast from each Prodcer to every Consumer.
To beign our implementation we look to the humble Envelope
. This will be the container and carrier of our Payload
and serve as the conduit for various utilities.
The Envelope
The first piece we need to extend is the Envelope
. The Envelope
will be the item that allows us to communicate an Ack/Nack back to the Originator.
public abstract record Envelope
{
public virtual string EnvelopeId => $"{GetHashCode()}";
}
This provides our base and gives us an overridable EnvelopeId
defaulting to the hash code of the record
itself.
public sealed record Envelope<TPayload>(
TPayload Payload,
TaskCompletionSource TaskSource,
Task Execution)
: Envelope
where TPayload : notnull
{
private TaskCompletionSource TaskSource { get; } = TaskSource;
private Task Execution { get; } = Execution;
public Exception? Failure { get; private set; }
public void Complete()
=> TaskSource.TrySetResult();
public void Fail(Exception exception)
{
TaskSource.TrySetException(exception);
Failure = exception;
}
public TaskAwaiter GetAwaiter()
=> Execution.GetAwaiter();
}
The first derivation Envelope
carries a Payload, that is, any object of type TPayload
. It exposes two methods to Complete
: ACK, or Fail
: NACK. Next, it privately holds a TaskCompletionSource
that is used to manage the completion of the Envelope
. Finally, a private Task
, whose Awaiter
is exposed, this allows the Envelope
to be awaited by an Originator.
Task & TaskCompletionSource
The execution Task
represents the abstract idea of any work necessary to process the Envelope
. Note that the work to process the Message is not tied to the Task
directly. We're not awaiting any Consumer. Instead, we're maintaining an outstanding future that can be finalized by a Complete
or Fail
method.
The TaskCompletionSource
is used to signal that the Task
has finished. Conmplete
sets the results as successful. Fail
takes an Exception
indicating that processing failed. However, the Exception
won't be propagated via an await
. We don't want the Originator, who may be awaiting the Envelope
, to pop an Exception
off the Task
. Instead, we use the pattern of a nullable Exception
to expose failure and encourage null propagation.
Packaging an Envelope
To pack this all up we first stuff in the Payload
and create our TaskCompletionSource
. We'll wrap this together with a timeout and async callbacks for onComplete
and onFailure
.
public static Envelope<TPayload> ToEnvelope<TPayload>(
this TPayload payload,
TimeSpan timeout,
Func<Task> onCompleted,
Func<Task> onFailure)
where TPayload : notnull
=> new TaskCompletionSource()
.CreateEnvelope(payload, timeout, onCompleted, onFailure);
Next, our TaskCompletionSource
is used to form an ExecutionMontior
. The job of the monitor is to maintain the timeout and execute either onCompleted
or onFailure
private static Envelope<TPayload> CreateEnvelope<TPayload>(
this TaskCompletionSource taskSource,
TPayload payload,
TimeSpan timeout,
Func<Task> onCompleted,
Func<Task> onFailure)
where TPayload : notnull
=> new Envelope<TPayload>(
Payload: payload,
TaskSource: taskSource,
Execution: taskSource.CreateExecutionMonitor(timeout, onCompleted, onFailure));
private static Task CreateExecutionMonitor(this TaskCompletionSource source,
TimeSpan timeout,
Func<Task> onCompleted,
Func<Task> onFailure)
{
return AsyncExecutionMonitor(source.Task, timeout, onCompleted, onFailure);
async Task AsyncExecutionMonitor(
Task completion,
TimeSpan timeout,
Func<Task> onCompleted,
Func<Task> onFailure)
{
if (await completion.TryWaitAsync(timeout))
await onCompleted();
else
await onFailure();
}
}
public static async Task<bool> TryWaitAsync(this Task task, TimeSpan timeout)
{
await Task.WhenAny(task, Task.Delay(timeout));
return task.IsCompletedSuccessfully;
}
The async monitor applies a global timeout to the future we're maintaining for the processing work. We cannot Cancel the work when the timeout has expired. This is a direct consequence of not having direct control, coupling, to the work being performed. The best we can do in this position is to Fail on our side and move on.
Within TryWaitAsync
we can see how the TrySetException
from within the Envelope
is rerouted into a bool consequence. It simply leverages WhenAny
and only considers if the Task
was successful. We know that any exception captured by the call to Fail(ex)
sets the failure on the Envelope
itself.
Completion
Now with Envelope<TPayload>
in hand.
Think for a moment; How, within our broadcast framework, do we determine if a Message is Complete? We can't simply say the Originating message is Complete when only one Consumer finishes. And what happens if 1 out of 20 Consumers fail? A reasonable default would be to wait for all Consumers and assume failure if any single Message fails. This follows the pattern of at-most-once delivery, although, we could extend to dedicated retrying of failures and more complex strategies. Now, recall we have a dynamic Consumer Set that we don't have direct knowledge of, so how do we manage the broadcast?
The BroadcastBlock
we used for our original implementation in Part 1 doesn't expose it's Consumers in any way. We don't see how many there are, nor even if all Consumers have been offered the Envelope
. We'll need more control for this and that's where we'll leverage Channels
ChannelSource
To control our broadcast we need knowledge and control over "Subscribers"/Consumers of our Source
. The first step is to create our own variant of the TPL Dataflow Link
. This is a structure representing the connection of a Source
and Sink
.
public record struct SinkLink(Guid LinkId) : IDisposable
{
private Action<Guid>? unlink = default;
private bool disposed = false;
public SinkLink(Guid linkId, Action<Guid> unlink)
: this(linkId)
=> this.unlink = unlink;
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing && !disposed)
unlink?.Invoke(LinkId);
disposed = true;
unlink = null;
}
}
Our SinkLink
is identifiable by its LinkId
and it holds a reference that allows it to Unlink
from the source via its LinkId
.
Next, we'll capture the concept of many outgoing Messages in the form of an Outbox
. This allows us to maintain record of all Messages broadcast and when they have Completed
public interface IOutbox<TPayload> where TPayload : notnull
{
void Complete();
Envelope<TPayload> GetEnvelope();
}
This Outbox
is intended to be created, then Messages submitted and finally Completed
.
internal sealed class EnvelopeOutbox<TPayload> : IOutbox<TPayload>
where TPayload : notnull
{
private readonly Envelope<TPayload> originator;
private readonly Action<Guid> destructor;
private readonly ConcurrentDictionary<Guid, Envelope<TPayload>> pool = new();
private readonly ConcurrentBag<Exception> failures = new();
private volatile int leaseCount = 0;
private volatile bool complete = false;
public EnvelopeOutbox(
Envelope<TPayload> originator,
Action<Guid> destructor)
{
this.originator = originator;
this.destructor = destructor;
}
public void Complete()
=> complete = true;
public Envelope<TPayload> GetEnvelope()
{
Interlocked.Increment(ref leaseCount);
var id = Guid.NewGuid();
var envelope = originator.Payload.ToEnvelope(
TimeSpan.FromSeconds(30),
() => EnvelopeDestructorAsync(id),
() => EnvelopeDestructorAsync(id));
pool[id] = envelope;
return envelope;
}
private async Task EnvelopeDestructorAsync(Guid id)
{
Interlocked.Decrement(ref leaseCount);
var envelope = pool[id];
failures.MaybeAdd(envelope?.Failure);
if (!ReadyForClosure) return;
await CloseOutPool(id);
}
private async Task CloseOutPool(Guid id)
{
await Task.WhenAll(pool.Select(async e => await e.Value));
if (failures.Any())
originator.Fail(new AggregateException(failures));
else
originator.Complete();
destructor(id);
}
private bool ReadyForClosure
=> leaseCount <= 0 && complete;
}
From this implementation we can see several things:
- Creation is dependent on an Originating Message and a
Destructor
to handle the finalization of theOutbox
- Each time a Message is requested we increment the internal
leaseCount
- The outgoing Messages are connected such that they are destructed, decrement the
leaseCount
and give us an opportunity to Close theOutbox
viaEnvelopeDestructorAsync
- Once the
Outbox
is Complete and theleaseCount
reaches zero we aggregate anyExceptions
callingComplete
orFail
on the Originator's Message - Finally, the
Source
providedOutbox Destructor
can be executed.
All of this allows our Source
to broadcast and maintain knowledge of all outgoing Messages. From the previous we can see we need to define two Factory Delegates
delegate IChannelSink<TPayload> SinkFactory<TPayload>()
where TPayload : notnull;
delegate IOutbox<TPayload> OutboxFactory<TPayload>(
Envelope<TPayload> originator,
Action<Guid> destructor)
where TPayload : notnull;
First, the SinkFactory
will be responsible for creating a new ChannelSink<TPayload>
. We can register this delegate and inject it if we ever find ourselves dynamically creating new Consumers. But for now, we'll implement it within our Source
. Secondly, we have the OutboxFactory
that creates an Outbox
each time we need to broadcast a Message.
The advantage of having these delegates is that it allows us to place their implementation close to the source of their use. The SinkFactory
will be held within the ChannelSource
, allowing us to place the new Sink
within the collection of targets held by the Source
. The OutboxFactory
, while not entirely necessary, is a simple abstraction of the Outbox
constructor that allows us to create one on demand and mock its implementation for testing.
Diving in
Now let's dive right into the implementation of the Source
, we have:
internal sealed class ChannelSource<TPayload>
: IChannelSource<TPayload>,
ILinkableSource<TPayload>
where TPayload : notnull
{
private readonly OutboxFactory<TPayload> factory;
private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();
public ChannelSource(OutboxFactory<TPayload> outboxFactory)
=> this.factory = outboxFactory;
public async ValueTask<bool> SendAsync(Envelope<TPayload> envelope, CancellationToken cancelToken = default)
{
cancelToken.ThrowIfCancellationRequested();
var outboxId = Guid.NewGuid();
var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
outboundMsgs.TryAdd(outboxId, outbox);
var writers = channels.Values;
var writing = writers.Select(async writer =>
{
await Task.Yield();
var outbound = outbox.GetEnvelope();
return writer.TryWrite(outbound);
});
outbox.Complete();
var results = await Task.WhenAll(writing);
return results.All(s => s);
}
public SinkFactory<TPayload> SinkFactory
=> () =>
{
var linkId = Guid.NewGuid();
var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
channels.TryAdd(linkId, channel);
var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
return channelSink;
};
}
Breaking this down we have two concurrent collections, each keyed with a UUID. The first, channels
is dedicated to holding all the targets our Source
is going to broadcast to. While outboundMsgs
will maintain an Outbox
for all Messages in flight
private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();
Then we have an implementation of the factory delegate SinkFactory<TPayload>
. This delegate implantation lives within the Source
in order to link the new Sink
to the Source
via the SinkLink
. Additionally, the destructor we pass enables removal of the target from the private set.
public SinkFactory<TPayload> SinkFactory
=> () =>
{
var linkId = Guid.NewGuid();
var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
channels.TryAdd(linkId, channel);
var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
return channelSink;
};
And finally, the real work is done within SendAsync(..)
. First, we check the cancelToken
. Then, we iterate through all targets. The syncronuous TryWrite
is used since we know these targets are unbound Sinks
and we've side stepped any idea of Channel
completion. However, there's no need to wait for any single TryWrite
so the operation yields to the iteration and we asyncronuously wait for all writes to complete. Finally, we return success or failure simply based on whether all writes were successful or not.
public async ValueTask<bool> SendAsync(
Envelope<TPayload> envelope,
CancellationToken cancelToken = default)
{
cancelToken.ThrowIfCancellationRequested();
var outboxId = Guid.NewGuid();
var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
outboundMsgs.TryAdd(outboxId, outbox);
var writers = channels.Values;
var writing = writers.Select(async writer =>
{
await Task.Yield();
var outbound = outbox.GetEnvelope();
return writer.TryWrite(outbound);
});
outbox.Complete();
var results = await Task.WhenAll(writing);
return results.All(s => s);
}
💥Well that was easy 🤣
ChannelSink
The last core piece of Part 2; the ChannelSink
. A much simpler implementation, all we need to do is continuously read from the Channel
and handle a standard call to Dispose
.
internal sealed class ChannelSink<TPayload>
: IChannelSink<TPayload>,
IDisposable
where TPayload : notnull
{
private readonly ChannelReader<Envelope<TPayload>> reader;
private readonly IDisposable sinkLink;
private bool disposed = false;
public ChannelSink(
ChannelReader<Envelope<TPayload>> reader,
IDisposable sinkLink)
{
this.reader = reader;
this.sinkLink = sinkLink;
}
public IAsyncEnumerable<Envelope<TPayload>> ConsumeAsync(CancellationToken cancelToken = default)
=> reader.ReadAllAsync(cancelToken);
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!disposed && disposing)
sinkLink.Dispose();
disposed = true;
}
}
Registration - I didn't forget
Of course we can't forget the full implementation of our IServiceCollection
extension
public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
where TPayload : notnull
=> services
.SetSingleton<OutboxFactory<TPayload>>(_
=> (originator, destructor)
=> new EnvelopeOutbox<TPayload>(originator, destructor))
.SetSingleton<SinkFactory<TPayload>>(sp =>
{
var source = sp.GetRequiredService<IChannelSource<TPayload>>();
var linkable = source as ILinkableSource<TPayload>
?? throw new ArgumentException($"Source {source.GetType().Name} must be linkable", nameof(source));
return linkable.SinkFactory;
})
.SetSingleton<IChannelSource<TPayload>, ChannelSource<TPayload>>()
.AddTransient<IChannelSink<TPayload>>(sp => sp.GetRequiredService<SinkFactory<TPayload>>().Invoke());
Closing Up
Part 1 saw the most basic broadcast implementation. That got us on the right track splitting Origination and Consumption. Now, while maintaining separation, we've allowed the concept of Completion and Acknowledgement to be introduced. At the end of Part 2 we have succeeded in passing our test!. But more than that, we can now broadcast a message from multiple Producers to multiple Consumers and allow a positive or negative acknowledgement to flow back to the Producer via the Originating Message
🤯Soo much cooler visually! But seriously with this in place, the next step towards our Async Mediator is asmall jump and w're well on the right path.
Subscribe to my newsletter
Read articles from Joshua Steward directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by