Azure Service Bus: Reading Messages Using ServiceBusProcessor

Joshua StewardJoshua Steward
8 min read

Service Bus Series - Part 4

Previously, we've covered creating, authenticating, writing and testing against an Azure Service Bus. The next step on our journey is to read continuously using a ServiceBusProcessor. This component triggers event handlers and is itself very similar to a BackgroundService.

What is the ServiceBusProcessor?

The ServiceBusProcessor is a high-level abstraction around a set of ServiceBusReceivers that allows a consumer to continuously receive messages. The processor comes in two varieties: a standard processor for Queues/Topics, and a Session enabled processor that manages the locks around Service Bus Sessions [stay tuned ๐Ÿค“].

The processor takes a pair of event handlers: one to handle the message, and another to handle any errors thrown during handling a message. With this model of processing, each message is managed through the event args passed to each delegate: ProcessMessageEventArgs and ProcessErrorEventArgs respectively. Through these arguments the handler can execute any of the typical Receiver operations, e.g. Abandon, Dead Letter, etc.

args.AbandonMessageAsync(args.Message);
args.DeadLetterMessageAsync(args.Message);

What We'll Build

We're going to build the core of what could become an Event-Driven microservice. This service will consume continuously from Azure Service Bus and delegate the messages to strongly typed handlers. The consuming process will be a Hosted Service derived from a BackgroundService.

Why a Background Service?

Why not just use a basicIHostedServiceimplementation?

Well, the key difference between an IHostedService and a BackgroundService lies in how the framework itself treats these components. A BackgroundService exposes a virtual nullable Task representing the execution of the Hosted Service. This is important because it gives the Host knowledge of your code's execution. Critically, the continuously running service is awaited and any exceptions are propagated out of the background to the Host.


Project Setup

First, dotnet new up a blank console application and install the following packages:

dotnet add package Azure.Identity
dotnet add package Azure.Messaging.ServiceBus
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Bogus

The Queue Reader

To kickoff, the QueueReader will be a generic BackgroundService defined as below, including all usings:

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using static System.Threading.CancellationTokenSource;

namespace ConcurrentFlows.AzureBusSeries.Part4.Reader;

public sealed class QueueReader<T>
    : BackgroundService,
    IAsyncDisposable
{
}

Next, we'll declare our dependencies and any necessary members:

public sealed class QueueReader<T>
    : BackgroundService,
    IAsyncDisposable
{
    private readonly ILogger<QueueReader<T>> logger;
    private readonly ServiceBusProcessor processor;
    private readonly IMessageHandler<T> handler;

    private CancellationTokenSource? stoppingCts;
}

We have a logger as a general practice, our key ServiceBusProcessor that will read from the queue, and a MessageHandler that will be passed any message received.

Next, our ServiceBusProcessor requires the definition of two event handlers: ProcessMessageAsync and ProcessErrorAsync. We'll keep these simple and compact. Process Message will delegate to the MessageHandler including cancelleation. Process Error will log the error details and move on. We don't need to directly Ack/Nack or Complete/Abandon/DLQ the message as we're using the Auto Complete feature of the processor by default.

private Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
    var body = args.Message.Body;
    var obj = body.ToObjectFromJson<T>();
    var cts = CreateLinkedTokenSource(stoppingCts!.Token, args.CancellationToken);
    return handler.HandleAsync(obj, cts.Token);
}

private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
    logger.LogWarning("Error Processing {@Error}",
        new
        {
            args.Identifier,
            ErrorSource = $"{args.ErrorSource}",
            Exception = $"{args.Exception}"
        });
    return Task.CompletedTask;
}

Then, as a BackgroundService we need to implement the abstract ExecuteAsync method. The way we do this is key to interacting with our Host. A common pitfall is not recognizing that the Task returned from ExecuteAsync represents the lifetime of the BackgroundService.

The ServiceBusProcessor exposes two methods we're concerned with here: StartProcessingAsync and StopProcessingAsync. Each of these methods return a Task representing the operation itself, i.e. Starting or Stopping, but neither represents the ongoing processing. If we were to simply await StartProcessingAsync and leave it at that, our Host would consider the service complete, i.e. it's no longer running.

What we want is to Start Processing, and then, hold until the service is commanded to shut down, then we'll Stop Processing. To do this we'll create a helper extension method:

public static Task CompleteOnCancelAsync(
    this CancellationToken token)
{
    var tcs = new TaskCompletionSource();
    token.Register(t =>
    {
        if (t is TaskCompletionSource tcs)
            tcs.TrySetResult();
    }, tcs);
    return tcs.Task;
}

This converts the cancellation signal from a token into the completion of a Task.

With this in hand we can implement ExecuteAsync

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingCts = CreateLinkedTokenSource(stoppingToken);
    await processor.StartProcessingAsync(CancellationToken.None);

    await stoppingToken.CompleteOnCancelAsync();

    stoppingCts.Cancel();
    await processor.StopProcessingAsync(CancellationToken.None);
}

Within this method, we first create the linked token source that is later passed to our MessageHandler. Then we Start, await a shutdown signal, then cancel any outstanding processing, and finally Stop processing. The rest of the lifetime boiler plate is already within the BackgroundService.

Finally, we implement our constructor to inject dependencies and assign the event handling delegates. Also, we'll wrap everything up with async disposal.

public QueueReader(
    ILogger<QueueReader<T>> logger,
    ServiceBusProcessor processor,
    IMessageHandler<T> handler)
{
    this.logger = logger.ThrowIfNull();
    this.processor = processor.ThrowIfNull();
    this.handler = handler.ThrowIfNull();

    processor.ProcessMessageAsync += ProcessMessageAsync;
    processor.ProcessErrorAsync += ProcessErrorAsync;
}

public async ValueTask DisposeAsync()
{
    await processor.DisposeAsync();
    stoppingCts?.Dispose();
    base.Dispose();
}

All together now!

public sealed class QueueReader<T>
    : BackgroundService,
    IAsyncDisposable
{
    private readonly ILogger<QueueReader<T>> logger;
    private readonly ServiceBusProcessor processor;
    private readonly IMessageHandler<T> handler;

    private CancellationTokenSource? stoppingCts;

    public QueueReader(
        ILogger<QueueReader<T>> logger,
        ServiceBusProcessor processor,
        IMessageHandler<T> handler)
    {
        this.logger = logger.ThrowIfNull();
        this.processor = processor.ThrowIfNull();
        this.handler = handler.ThrowIfNull();

        processor.ProcessMessageAsync += ProcessMessageAsync;
        processor.ProcessErrorAsync += ProcessErrorAsync;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingCts = CreateLinkedTokenSource(stoppingToken);
        await processor.StartProcessingAsync(CancellationToken.None);

        await stoppingToken.CompleteOnCancelAsync();

        stoppingCts.Cancel();
        await processor.StopProcessingAsync(CancellationToken.None);
    }

    private Task ProcessMessageAsync(ProcessMessageEventArgs args)
    {
        var body = args.Message.Body;
        var obj = body.ToObjectFromJson<T>();
        var cts = CreateLinkedTokenSource(stoppingCts!.Token, args.CancellationToken);
        return handler.HandleAsync(obj, cts.Token);
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        logger.LogWarning("Error Processing {@Error}",
            new
            {
                args.Identifier,
                ErrorSource = $"{args.ErrorSource}",
                Exception = $"{args.Exception}"
            });
        return Task.CompletedTask;
    }

    public async ValueTask DisposeAsync()
    {
        await processor.DisposeAsync();
        stoppingCts?.Dispose();
        base.Dispose();
    }
}

Using the Reader

To use the QueueReader we'll use our existing Service Bus Queue, create a message, a QueueWriter, a MessageHandler, and stand everything up in Host.

First, the message we'll send looks like this notification, just an id and string content:

public sealed record Notification(
    int Id,
    string Content);

Then the IMessageHandler implementation receives the Notification and logs it:

public sealed class NotificationHandler
    : IMessageHandler<Notification>
{
    private readonly ILogger<NotificationHandler> logger;

    public NotificationHandler(
        ILogger<NotificationHandler> logger)
        => this.logger = logger.ThrowIfNull();

    public Task HandleAsync(Notification message, CancellationToken cancelToken)
    {
        logger.LogInformation("Received Message:{NewLine}{Message}", 
            NewLine, message);
        return Task.CompletedTask;
    }
}

The next major component is the QueueWriter, which is very similar to the QueueSender we created before. The main difference being this one sends a configured batch of messages. Also, we throw in a little Bogus to get a fun, friendly, random name for the message.

public sealed class QueueWriter
    : BackgroundService,
    IAsyncDisposable
{
    private readonly ILogger<QueueWriter> logger;
    private readonly ServiceBusSender sender;
    private readonly int count;
    private readonly Faker faker = new();

    public QueueWriter(
        ILogger<QueueWriter> logger,
        ServiceBusSender sender,
        int count = 5)
    {
        this.logger = logger.ThrowIfNull();
        this.sender = sender.ThrowIfNull();
        this.count = count;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        using var cts = CreateLinkedTokenSource(stoppingToken);

        var messages = Enumerable.Range(1, count)
            .Select(id =>
            {
                var name = faker.Name.FirstName();
                var notification = new Notification(id, $"Hello from {name}");
                var body = JsonSerializer.Serialize(notification);
                var message = new ServiceBusMessage(body);
                return message;
            });

        await sender.SendMessagesAsync(messages, stoppingToken);
        logger.LogInformation("Finished");
    }

    public async ValueTask DisposeAsync()
    {
        await sender.DisposeAsync();
        base.Dispose();        
    }
}

Setting up the Host

First, we'll setup credentials the same as we configured them earlier in Part 2 - Default Credentials

public static class CredentialBuilder
{
    public static DefaultAzureCredential CreateDefaultCredential(
        this IConfiguration config)
    {
        var tenantId = config["TenantId"];
        return new(new DefaultAzureCredentialOptions()
        {
            TenantId = tenantId
        }.SetVisualStudioCredentialingOnly());
    }

    public static DefaultAzureCredentialOptions SetVisualStudioCredentialingOnly(
        this DefaultAzureCredentialOptions options)
    {
        options.ExcludeAzureCliCredential = true;
        options.ExcludeAzureDeveloperCliCredential = true;
        options.ExcludeAzurePowerShellCredential = true;
        options.ExcludeEnvironmentCredential = true;
        options.ExcludeInteractiveBrowserCredential = true;
        options.ExcludeVisualStudioCodeCredential = true;
        options.ExcludeWorkloadIdentityCredential = true;
        options.ExcludeManagedIdentityCredential = true;
        options.ExcludeSharedTokenCacheCredential = true;
        return options;
    }
}

Then we have the registrations for our Service Bus components

public static class Registrations
{
    private const string Identifier = "AzureBusSeries";

    public static IServiceCollection AddAzureBusComponents(
        this IServiceCollection services)
        => services
        .AddAzureBusClient()
        .AddAzureBusSender()
        .AddAzureBusProcessor();

    private static IServiceCollection AddAzureBusClient(
        this IServiceCollection services)
        => services.AddSingleton(sp =>
        {
            var config = sp.GetRequiredService<IConfiguration>();

            var credential = config.CreateDefaultCredential();
            var hostName = config["ServiceBusHost"];

            var options = new ServiceBusClientOptions()
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets,
                Identifier = $"{Identifier}-Client"
            };

            return new ServiceBusClient(hostName, credential, options);
        });

    private static IServiceCollection AddAzureBusSender(
        this IServiceCollection services)
        => services.AddSingleton(sp =>
        {
            var config = sp.GetRequiredService<IConfiguration>();
            var client = sp.GetRequiredService<ServiceBusClient>();
            var queue = config["Queue"];

            var options = new ServiceBusSenderOptions()
            {
                Identifier = $"{Identifier}-Writer"
            };

            return client.CreateSender(queue, options);
        });

    private static IServiceCollection AddAzureBusProcessor(
        this IServiceCollection services)
        => services.AddSingleton(sp =>
        {
            var config = sp.GetRequiredService<IConfiguration>();
            var client = sp.GetRequiredService<ServiceBusClient>();
            var queue = config["Queue"];

            var options = new ServiceBusProcessorOptions()
            {
                Identifier = $"{Identifier}-Reader"
            };

            return client.CreateProcessor(queue, options);
        });
}

And finally, we pull it all together in our Program.cs

var builder = Host.CreateApplicationBuilder();

builder.Configuration.AddUserSecrets<Program>();

builder.Logging.AddConsole();

builder.Services
    .AddHostedService<QueueWriter>()
    .AddHostedService<QueueReader<Notification>>()
    .AddSingleton<IMessageHandler<Notification>, NotificationHandler>()
    .AddAzureBusComponents();

var app = builder.Build();

await app.RunAsync();

Execute!!

If you've got everything set up right and secrets configured, run the app and you'll see something like this logged to the console

info: ConcurrentFlows.AzureBusSeries.Part4.Writer.QueueWriter[0]
      Finished
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
      Received Message:
      Notification { Id = 1, Content = Hello from Caden }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
      Received Message:
      Notification { Id = 2, Content = Hello from Rylan }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
      Received Message:
      Notification { Id = 3, Content = Hello from Rubie }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
      Received Message:
      Notification { Id = 4, Content = Hello from April }
info: ConcurrentFlows.AzureBusSeries.Part4.Handlers.NotificationHandler[0]
      Received Message:
      Notification { Id = 5, Content = Hello from Katelin }

Wrap Up & Repo

Here, we built a BackgroundService to host the ServiceBusProcessor. This kept the framework informed of our processing lifetime, delegated the message handling to a dependency and flowed all cancellation signals. Also, we built out the supporting components and gave our new QueueReader a test drive. Hopefully, this pattern is useful the next time you're creating a new event-driven dotnet app!

Happy Messaging!

Of course, all code is available on GitHub โ‡’ ConcurrentFlows.AzureBusSeries

If there's anything specific you'd like covered regarding Service Bus, please drop an ask in the comments!

0
Subscribe to my newsletter

Read articles from Joshua Steward directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Joshua Steward
Joshua Steward