AsyncMediator Series - Part 1
Inspiration
Origination executes distinctly from Consumption
From the beginning 🤔
The idea of a Mediator is generally the go to when we want to decouple a Request from its Processing. There's already some well established and widely used mediator packages e.g. MediatR; J. Bogard, Mediator.NET; E. Ma, etc. These all quite well implement variations on the Mediator Pattern...👌. Largely we see these operating in process and not across the wire, with the exception of MassTransit - Mediator of course, but we won't go there right now.😎 Additionally we can see many of the fundamental ideas presented within the ...Query Side and ... Command Side of my architecture; S. Deursen, also of Simple Injector Rock Stardom 🎸
On the market today
Right up front; I really enjoy both these libraries but ultimately I'd like to go another direction 🤔
First, let's take a closer look; all these options are quite feature rich, and support similar architectural patterns/styles.
- CQRS Style Patterns
- Pub/Sub
- Pipelines / Middleware / Behaviors
What about the Mediator?
Traditionally, a Mediator represents a focal point between an Originator and one or many Consumers. This allows the Originator to avoid explicitly referring to or knowing about the Consumers
The Consumers are typically conceptualized as Handlers
of particular messages and can form Pipelines
or chains. Each Handler
playing a specific role in the choreography. Additional and complimentary concepts can be added with associated Middleware
or Behaviors
, etc. promoting some aspects of loose coupling. But the call from Originator to Handler
is still synchronous with regard to its Registration
. The Registration
couples, albeit in the background, the Originator and the Consumer and both processes are tightly working together via the Mediator
.
For instance MediatR holds a reference to all of its Handlers and dispatches associated Requests
and/or Notifications
. Mediator.Net has a nice abstraction in place; Pipes
form its central core and you can even stream responses back via the receive pipeline.
💘IAsyncEnumerable
. However, all Handlers
are registered and well known. Both of these methodologies yield a static topology.
These structures have the advantage when you want a response back from a particular request. But I'm not interested in call and response; receiving a message should yield another subsequent message and we'll leave any correlleation back to the origination to simply be implicit.
Schema Driven Mediator
Conceptually
Conceptually what we're after is much the same as you'd find in a distributed messaging broker: Kafka, Pulsar, RabbitMQ, etc. With this in mind we'll split our message Origination entirely from it's Consumption;
This is the key I want changed; Origination executes distinctly from Consumption
Architectural Objectives
Primary Goals
Fundamentally we want an entirely decoupled architecture; the only shared knowledge should be the message Schema
.
- Decoupling between processing units
- Schema Driven communication
- The
Originating Process
should remain separate from theConsuming Process
- Consumer Declared Consumption - this is where we decouple
- No concept of registering
Handlers
- Consumers and their associated process may come and go
- No concept of registering
- Broadcast & Multicast Origination
- One or Many Originators can produce a message
- One or Many Consumers can listen
Above, we can see the type of outbound Broadcast/Multicast style brokerage we're after. A decoupled, fanout of Messages, is relatively easy to achieve.
Putting it together
Foundations
Our first step is to split the Originator from Consumer
and further avoid any implicit coupling. All Requests
, Commands
, Notifications
, Responses
, etc. will travel asynchronously. The building blocks of this system will be the abstract concepts of Schema, Message, and ultimately Flow.
- Schema - Defines the shape of data/payload/behavior to be implemented by a Message
- Message - Takes the form defined by the Schema and represents an immutable and unique representation of that Schema at a point in time
- Flow - Is the unidirectional transmission of Messages. A single Flow only permits a single Schema
Async Message Fanout
Abstractions
Producing a Flow are our two fundamental building blocks: FlowSource
& FlowSink
. These complimentary components define either end of a Flow constrained to a single Schema. The Messages will be transmitted in the background of our application via a Source
/Sink
set of abstractions. Each is defined below:
public interface IFlowSource<TSchema>
: IAsyncDisposable, IDisposable
where TSchema : Envelope
{
ValueTask<bool> EmitAsync(TSchema message, CancellationToken cancelToken = default);
}
public interface IFlowSink<TSchema>
: IAsyncDisposable, IDisposable
where TSchema : Envelope
{
IAsyncEnumerable<TSchema> ConsumeAsync(CancellationToken cancelToken = default);
}
Note, both ends of our unidirectional Flow are (Async)Disposable
. The unidirectional nature of the Flow leads directly to two implications.
- Disposing a
Sink
, disposes only that recipient endpoint - Disposing a
Source
, however, closes the entirety of the Flow
Here we've also defined a constraint of Envelope, as seen, this constraint, i.e our base Schema, ensures our Messages always have a CurrentId
and a CausationId
. All this means is that we know where we are now and where we came from.
public abstract record Envelope(
string CurrentId,
string CausationId);
Simple Message
passing via the twin concepts of Source/Sink allow the Origination and Consumption to be decoupled. Our Source
, via an Originator, produces messages at one end and the Sink
receives them into a Consumer. This takes advantage of much the same concepts of any Message Bus but applies an asynchronous distributed behavior within a single application.
Implementations
Our implementations are built upon the now standard System...Dataflow namespace containing a wide variety of Dataflow primitives. This library is quite powerful in terms of message passing and parallel processing; complimenting the typical async/await paradigm. Other implementations are of course possible;
Each has tradeoffs of course, the main driver toward Dataflow was the simplicity of separating Message Passing and Message Consuming. I don't want to force any Consumer into participating in any semantic Observable Pipeline, especially if it is instead better represented as a discrete step. Flow remains opt-in and can be as simple as injection of the Sink
and reading from it.
Channels of course could underpin Dataflow but natively Channels are not intended to support any kind of Broadcast/Multicast.
FlowSink
To build our Flow first we'll look at the Sink
. The purpose and intention of this component is to be injected via dependency injection into any Consumer
that has an interest in the defined Schema
. That is; any defined Consumer that must take action upon receiving a Message
with the defined Schema
. I've collapsed most of the disposal for brevity; the key piece is disposing of the link back to the Source
.
internal sealed class FlowSink<TSchema>
: IFlowSink<TSchema>
where TSchema : Envelope
{
private BufferBlock<TSchema>? Buffer { get; set; }
private readonly IDisposable link;
private volatile bool isDisposed;
public FlowSink(ILinkableSource<TSchema> source)
{
Buffer = new(new()
{
EnsureOrdered = true,
BoundedCapacity = DataflowBlockOptions.Unbounded
});
link = source.LinkTo(Buffer);
}
public IAsyncEnumerable<TSchema> ConsumeAsync(CancellationToken cancelToken = default)
=> Buffer.ThrowIfDisposed(isDisposed)
.EnumerateSource(cancelToken)
.Attempt(onError: ex =>
{
this.Dispose();
return AsyncEnumerable.Empty<TSchema>();
});
public void Dispose() {/*...*/}
public ValueTask DisposeAsync() {/*...*/}
private void DisposeCore()
{
isDisposed = true;
link?.Dispose();
Buffer = null;
GC.SuppressFinalize(this);
}
}
Our FlowSink
is built on top of an internal BufferBlock
bound to our Schema
. We maintain a concrete implementation of BufferBlock
with the intention of later using its Count
but this could well be represented as a IPropagatorBlock
until specificities are necessary. Next, the only item we're dependent on for construction is an ILinkableSource<TItem>
defined as follows
LinkableSource
internal interface ILinkableSource<TSchema>
where TSchema : Envelope
{
IDisposable LinkTo(ITargetBlock<TSchema> sink);
}
Keeping this interface internal
allows us to keep this concept scoped within our package. This narrowly exposes the internal mechanism of Linking two Dataflow Blocks together. Once linked, each block will process messages as defined by its implementation and operate independently of any other within the bounds set by the creation and linking.
Lastly, we can see that the Buffer
is consumed via a Disposal and Exception protected cancellable extensions to IAsyncEnumerable
. While not necessary for this, we've decorated the CancellationToken
as the target of [EnumeratorCancellation]
for broader use cases. To keep things brief I'll leave the full implementations for review via GitHub
[return: NotNull]
public static T ThrowIfDisposed<T>(
this T? target,
bool isDisposed)
internal static Func<IAsyncEnumerable<TSchema>> EnumerateSource<TSchema>(
this ISourceBlock<TSchema> source,
CancellationToken cancelToken)
public static IAsyncEnumerable<T> Attempt<T>(
this Func<IAsyncEnumerable<T>> iterator,
Func<Exception, IAsyncEnumerable<T>> onError,
Func<Exception, bool>? canHandle = default)
where T : class
These are relatively simple extensions, although Attempt
has quite the signature 🤯
But each do just what's on the tin; ThrowIfDisposed, EnumerateSource; i.e. Orchestrate the Enumerator
, and finally let Attempt manage the execution.
FlowSource
Next is our Broadcast/Multicast enabled Source
. This is accomplished by exposing the capabilities of a BroadcastBlock
. This block clones, in our case - returns, each message received and Offers it to each Linked block. The importance of Offer is such that if a Linked block cannot take the Message
; that Message
is then dropped, i.e. lost forever and for good. This leads to Backpressure, another high point for choosing Dataflow yet out of scope here, but we set all Blocks with an UnboundedCapacity
for simplicity to begin. So Source
can be implemented as such; again with collapsed disposal, we're both Completing and then awaiting Completion of the Source
during cleanup.
internal sealed class FlowSource<TSchema>
: IFlowSource<TSchema>,
ILinkableSource<TSchema>
where TSchema : Envelope
{
private BroadcastBlock<TSchema>? Source { get; set; }
private volatile bool isDisposed;
public FlowSource()
=> Source = new(msg => msg,
new()
{
EnsureOrdered = true,
BoundedCapacity = DataflowBlockOptions.Unbounded
});
public ValueTask<bool> EmitAsync(TSchema message, CancellationToken cancelToken = default)
=> Source.ThrowIfDisposed(isDisposed)
.OfferAsync(message, TimeSpan.FromMilliseconds(300), cancelToken)
.Attempt(onError: ex => ValueTask.FromResult(false));
IDisposable ILinkableSource<TSchema>.LinkTo(ITargetBlock<TSchema> sink)
=> Source.ThrowIfDisposed(isDisposed)
.LinkTo(sink, new()
{
PropagateCompletion = true,
});
public void Dispose() { /*...*/}
public async ValueTask DisposeAsync() { /*...*/}
private async ValueTask DisposeAsyncCore()
{
isDisposed = true;
Source?.Complete();
await (Source?.Completion ?? Task.CompletedTask);
Source = null;
GC.SuppressFinalize(this);
}
}
This implementation exposes EmitAsync
and transforms the standard Task<bool>
of the Block into a disposed protected and orchestrated Attempt
yielding a ValueTask<bool>
via simple extensions. Additionally, we separately implement the internal ILinkableSource<TItem>
interface to connect any downstream Sinks
. This exposes a disposed protected call into .LinkTo
ensuring that Completion is propagated. With this configuration set; if the Source
is disposed and thus Completed this information will flow down to all Sinks
which will then exit any ongoing or new iteration upon the Sink
.
Registration
With just these two components we can achieve the first two Primary Goals, albeit with a little DI trickery
- Decoupling between processing units
- Consumer Declared Consumption
public static IServiceCollection AddFlow<TSchema>(this IServiceCollection services)
where TSchema : Envelope
{
services.TryAddSingleton<IFlowSource<TSchema>, FlowSource<TSchema>>();
services.TryAddTransient<IFlowSink<TSchema>>(sp =>
{
var source = sp.GetRequiredService<IFlowSource<TSchema>>();
var linkable = source as ILinkableSource<TSchema>
?? throw new ArgumentException(
$"Invalid FlowSource Registration. Source Type, {source.GetType().Name}, is not linkable");
return new FlowSink<TSchema>(linkable);
});
return services;
}
Broadcast/Multicast
Registering the FlowSource
as a singleton ensures any consumer of this interface is Emitting
to the same BroadcastBlock
. This allows one to many Originators to enter Messages
into the flow.
Declared Consumption
Thanks to the leniency of the default Dependency Injection of dotnet, i.e. IServiceCollection/IServiceProvider
; we can register our Sink
as a Transient. Doing so may yield a captured dependency if the consuming service has a longer lifetime than our Sink
. However, the advantage is that we ensure each Consumer receives a unique and linked instance of our Sink
. In this variation we're leveraging the DI container to dynamically construct our topology. Assuming, 🤞, our Consumers dispose of the Sink
properly it will be appropriately removed from the Flow
topology.
Closure
With just these two primary components, FlowSource
& FlowSink
, we have achieved a basic decoupling and fanout of Messages
. We really don't even need a specific Mediator
to do this.
The next evolution is going to be tying things together via the Envelope
. This will allow us to put backpressure on a Source
while maintaining independence. Additionally, it will provide a vector to ack/nack our Source
.
Of course, all the code for this post can be found on GitHub ConcurrentFlows.AsyncMediator
Subscribe to my newsletter
Read articles from Joshua Steward directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by