Azure Service Bus: Reading Messages Using ServiceBusProcessor
Service Bus Series - Part 4
Previously, we've covered creating, authenticating, writing and testing against an Azure Service Bus. The next step on our journey is to read continuously using a ServiceBusProcessor
. This component triggers event handlers and is itself very similar to a BackgroundService
.
What is the ServiceBusProcessor?
The ServiceBusProcessor
is a high-level abstraction around a set of ServiceBusReceivers
that allows a consumer to continuously receive messages. The processor comes in two varieties: a standard processor for Queues/Topics, and a Session enabled processor that manages the locks around Service Bus Sessions [stay tuned ๐ค].
The processor takes a pair of event handlers: one to handle the message, and another to handle any errors thrown during handling a message. With this model of processing, each message is managed through the event args passed to each delegate: ProcessMessageEventArgs
and ProcessErrorEventArgs
respectively. Through these arguments the handler can execute any of the typical Receiver operations, e.g. Abandon, Dead Letter, etc.
args.AbandonMessageAsync(args.Message);
args.DeadLetterMessageAsync(args.Message);
What We'll Build
We're going to build the core of what could become an Event-Driven microservice. This service will consume continuously from Azure Service Bus and delegate the messages to strongly typed handlers. The consuming process will be a Hosted Service derived from a BackgroundService
.
Why a Background Service?
Why not just use a basic
IHostedService
implementation?
Well, the key difference between an IHostedService
and a BackgroundService
lies in how the framework itself treats these components. A BackgroundService
exposes a virtual nullable Task representing the execution of the Hosted Service. This is important because it gives the Host knowledge of your code's execution. Critically, the continuously running service is awaited and any exceptions are propagated out of the background to the Host.
Project Setup
First, dotnet new up a blank console application and install the following packages:
dotnet add package Azure.Identity
dotnet add package Azure.Messaging.ServiceBus
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Bogus
The Queue Reader
To kickoff, the QueueReader
will be a generic BackgroundService
defined as below, including all usings:
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using static System.Threading.CancellationTokenSource;
namespace ConcurrentFlows.AzureBusSeries.Part4.Reader;
public sealed class QueueReader<T>
: BackgroundService,
IAsyncDisposable
{
}
Next, we'll declare our dependencies and any necessary members:
public sealed class QueueReader<T>
: BackgroundService,
IAsyncDisposable
{
private readonly ILogger<QueueReader<T>> logger;
private readonly ServiceBusProcessor processor;
private readonly IMessageHandler<T> handler;
private CancellationTokenSource? stoppingCts;
}
We have a logger
as a general practice, our key ServiceBusProcessor
that will read from the queue, and a MessageHandler
that will be passed any message received.
Next, our ServiceBusProcessor
requires the definition of two event handlers: ProcessMessageAsync
and ProcessErrorAsync
. We'll keep these simple and compact. Process Message will delegate to the MessageHandler
including cancelleation. Process Error will log the error details and move on. We don't need to directly Ack/Nack or Complete/Abandon/DLQ the message as we're using the Auto Complete feature of the processor by default.
private Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var body = args.Message.Body;
var obj = body.ToObjectFromJson<T>();
var cts = CreateLinkedTokenSource(stoppingCts!.Token, args.CancellationToken);
return handler.HandleAsync(obj, cts.Token);
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
logger.LogWarning("Error Processing {@Error}",
new
{
args.Identifier,
ErrorSource = $"{args.ErrorSource}",
Exception = $"{args.Exception}"
});
return Task.CompletedTask;
}
Then, as a BackgroundService
we need to implement the abstract ExecuteAsync
method. The way we do this is key to interacting with our Host. A common pitfall is not recognizing that the Task returned from ExecuteAsync
represents the lifetime of the BackgroundService
.
The ServiceBusProcessor
exposes two methods we're concerned with here: StartProcessingAsync
and StopProcessingAsync
. Each of these methods return a Task representing the operation itself, i.e. Starting or Stopping, but neither represents the ongoing processing. If we were to simply await StartProcessingAsync
and leave it at that, our Host would consider the service complete, i.e. it's no longer running.
What we want is to Start Processing, and then, hold until the service is commanded to shut down, then we'll Stop Processing. To do this we'll create a helper extension method:
public static Task CompleteOnCancelAsync(
this CancellationToken token)
{
var tcs = new TaskCompletionSource();
token.Register(t =>
{
if (t is TaskCompletionSource tcs)
tcs.TrySetResult();
}, tcs);
return tcs.Task;
}
This converts the cancellation signal from a token into the completion of a Task.
With this in hand we can implement ExecuteAsync
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingCts = CreateLinkedTokenSource(stoppingToken);
await processor.StartProcessingAsync(CancellationToken.None);
await stoppingToken.CompleteOnCancelAsync();
stoppingCts.Cancel();
await processor.StopProcessingAsync(CancellationToken.None);
}
Within this method, we first create the linked token source that is later passed to our MessageHandler
. Then we Start, await a shutdown signal, then cancel any outstanding processing, and finally Stop processing. The rest of the lifetime boiler plate is already within the BackgroundService
.
Finally, we implement our constructor to inject dependencies and assign the event handling delegates. Also, we'll wrap everything up with async disposal.
public QueueReader(
ILogger<QueueReader<T>> logger,
ServiceBusProcessor processor,
IMessageHandler<T> handler)
{
this.logger = logger.ThrowIfNull();
this.processor = processor.ThrowIfNull();
this.handler = handler.ThrowIfNull();
processor.ProcessMessageAsync += ProcessMessageAsync;
processor.ProcessErrorAsync += ProcessErrorAsync;
}
public async ValueTask DisposeAsync()
{
await processor.DisposeAsync();
stoppingCts?.Dispose();
base.Dispose();
}
All together now!
public sealed class QueueReader<T>
: BackgroundService,
IAsyncDisposable
{
private readonly ILogger<QueueReader<T>> logger;
private readonly ServiceBusProcessor processor;
private readonly IMessageHandler<T> handler;
private CancellationTokenSource? stoppingCts;
public QueueReader(
ILogger<QueueReader<T>> logger,
ServiceBusProcessor processor,
IMessageHandler<T> handler)
{
this.logger = logger.ThrowIfNull();
this.processor = processor.ThrowIfNull();
this.handler = handler.ThrowIfNull();
processor.ProcessMessageAsync += ProcessMessageAsync;
processor.ProcessErrorAsync += ProcessErrorAsync;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingCts = CreateLinkedTokenSource(stoppingToken);
await processor.StartProcessingAsync(CancellationToken.None);
await stoppingToken.CompleteOnCancelAsync();
stoppingCts.Cancel();
await processor.StopProcessingAsync(CancellationToken.None);
}
private Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var body = args.Message.Body;
var obj = body.ToObjectFromJson<T>();
var cts = CreateLinkedTokenSource(stoppingCts!.Token, args.CancellationToken);
return handler.HandleAsync(obj, cts.Token);
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
logger.LogWarning("Error Processing {@Error}",
new
{
args.Identifier,
ErrorSource = $"{args.ErrorSource}",
Exception = $"{args.Exception}"
});
return Task.CompletedTask;
}
public async ValueTask DisposeAsync()
{
await processor.DisposeAsync();
stoppingCts?.Dispose();
base.Dispose();
}
}
Using the Reader
To use the QueueReader
we'll use our existing Service Bus Queue, create a message, a QueueWriter
, a MessageHandler
, and stand everything up in Host.
First, the message we'll send looks like this notification, just an id and string content:
public sealed record Notification(
int Id,
string Content);
Then the IMessageHandler
implementation receives the Notification
and logs it:
public sealed class NotificationHandler
: IMessageHandler<Notification>
{
private readonly ILogger<NotificationHandler> logger;
public NotificationHandler(
ILogger<NotificationHandler> logger)
=> this.logger = logger.ThrowIfNull();
public Task HandleAsync(Notification message, CancellationToken cancelToken)
{
logger.LogInformation("Received Message:{NewLine}{Message}",
NewLine, message);
return Task.CompletedTask;
}
}
The next major component is the QueueWriter
, which is very similar to the QueueSender
we created before. The main difference being this one sends a configured batch of messages. Also, we throw in a little Bogus to get a fun, friendly, random name for the message.
public sealed class QueueWriter
: BackgroundService,
IAsyncDisposable
{
private readonly ILogger<QueueWriter> logger;
private readonly ServiceBusSender sender;
private readonly int count;
private readonly Faker faker = new();
public QueueWriter(
ILogger<QueueWriter> logger,
ServiceBusSender sender,
int count = 5)
{
this.logger = logger.ThrowIfNull();
this.sender = sender.ThrowIfNull();
this.count = count;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var cts = CreateLinkedTokenSource(stoppingToken);
var messages = Enumerable.Range(1, count)
.Select(id =>
{
var name = faker.Name.FirstName();
var notification = new Notification(id, $"Hello from {name}");
var body = JsonSerializer.Serialize(notification);
var message = new ServiceBusMessage(body);
return message;
});
await sender.SendMessagesAsync(messages, stoppingToken);
logger.LogInformation("Finished");
}
public async ValueTask DisposeAsync()
{
await sender.DisposeAsync();
base.Dispose();
}
}
Setting up the Host
First, we'll setup credentials the same as we configured them earlier in Part 2 - Default Credentials
public static class CredentialBuilder
{
public static DefaultAzureCredential CreateDefaultCredential(
this IConfiguration config)
{
var tenantId = config["TenantId"];
return new(new DefaultAzureCredentialOptions()
{
TenantId = tenantId
}.SetVisualStudioCredentialingOnly());
}
public static DefaultAzureCredentialOptions SetVisualStudioCredentialingOnly(
this DefaultAzureCredentialOptions options)
{
options.ExcludeAzureCliCredential = true;
options.ExcludeAzureDeveloperCliCredential = true;
options.ExcludeAzurePowerShellCredential = true;
options.ExcludeEnvironmentCredential = true;
options.ExcludeInteractiveBrowserCredential = true;
options.ExcludeVisualStudioCodeCredential = true;
options.ExcludeWorkloadIdentityCredential = true;
options.ExcludeManagedIdentityCredential = true;
options.ExcludeSharedTokenCacheCredential = true;
return options;
}
}
Then we have the registrations for our Service Bus components
public static class Registrations
{
private const string Identifier = "AzureBusSeries";
public static IServiceCollection AddAzureBusComponents(
this IServiceCollection services)
=> services
.AddAzureBusClient()
.AddAzureBusSender()
.AddAzureBusProcessor();
private static IServiceCollection AddAzureBusClient(
this IServiceCollection services)
=> services.AddSingleton(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var credential = config.CreateDefaultCredential();
var hostName = config["ServiceBusHost"];
var options = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
Identifier = $"{Identifier}-Client"
};
return new ServiceBusClient(hostName, credential, options);
});
private static IServiceCollection AddAzureBusSender(
this IServiceCollection services)
=> services.AddSingleton(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var client = sp.GetRequiredService<ServiceBusClient>();
var queue = config["Queue"];
var options = new ServiceBusSenderOptions()
{
Identifier = $"{Identifier}-Writer"
};
return client.CreateSender(queue, options);
});
private static IServiceCollection AddAzureBusProcessor(
this IServiceCollection services)
=> services.AddSingleton(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var client = sp.GetRequiredService<ServiceBusClient>();
var queue = config["Queue"];
var options = new ServiceBusProcessorOptions()
{
Identifier = $"{Identifier}-Reader"
};
return client.CreateProcessor(queue, options);
});
}
And finally, we pull it all together in our Program.cs
var builder = Host.CreateApplicationBuilder();
builder.Configuration.AddUserSecrets<Program>();
builder.Logging.AddConsole();
builder.Services
.AddHostedService<QueueWriter>()
.AddHostedService<QueueReader<Notification>>()
.AddSingleton<IMessageHandler<Notification>, NotificationHandler>()
.AddAzureBusComponents();
var app = builder.Build();
await app.RunAsync();
Execute!!
If you've got everything set up right and secrets configured, run the app and you'll see something like this logged to the console
info: ConcurrentFlows.AzureBusSeries.Part4.Writer.QueueWriter[0]
Finished
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
Received Message:
Notification { Id = 1, Content = Hello from Caden }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
Received Message:
Notification { Id = 2, Content = Hello from Rylan }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
Received Message:
Notification { Id = 3, Content = Hello from Rubie }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
Received Message:
Notification { Id = 4, Content = Hello from April }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
Received Message:
Notification { Id = 5, Content = Hello from Katelin }
Wrap Up & Repo
Here, we built a BackgroundService
to host the ServiceBusProcessor
. This kept the framework informed of our processing lifetime, delegated the message handling to a dependency and flowed all cancellation signals. Also, we built out the supporting components and gave our new QueueReader
a test drive. Hopefully, this pattern is useful the next time you're creating a new event-driven dotnet app!
Happy Messaging!
Of course, all code is available on GitHub โ ConcurrentFlows.AzureBusSeries
If there's anything specific you'd like covered regarding Service Bus, please drop an ask in the comments!
Subscribe to my newsletter
Read articles from Joshua Steward directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by