How CQRS and Its Implementation Become Simpler with Railway Oriented Programming
Recently, I have been creating ResultBoxes to practice Railway Oriented Programming, which involves using the Result pattern for error handling and chaining data pipelines together with functions.
What is Railway Oriented Programming?
Railway Oriented Programming is a method proposed by Scott Wlaschin, who has also written books on functional programming in F#, where instead of throwing errors, you use the Result type as the return value of functions, thereby constructing a data pipeline.
https://fsharpforfunandprofit.com/rop/
Scott wrote a book called "Domain Modeling Made Functional" and this book explains how functional programming help to build domain models.
Although this book also covers Railway Oriented Programming, it uses the name "Railroad Oriented Programming." Initially, I was surprised by the inconsistency, but when I posted about it on 𝕏, the author, Scott, explained it to me.
What improves with Railway Oriented Programming?
This is, of course, a personal opinion, but does Railway Oriented Programming make programs simpler? First, please take a look at a single file before and after the code has been changed to Railway Oriented Programming.
Before using Railway Oriented Programming
public class BlobAccessor(BlobConfiguration configuration)
{
public async Task<Result<bool>> SaveBlobAsync(
string containerName,
string filename,
byte[] Data)
{
var containerResult = GetContainer(containerName);
if (!containerResult.IsSuccessful)
{
return new Result<bool>(containerResult.Error);
}
var container = containerResult.Value;
var blobClient = container.GetBlobClient(filename);
await blobClient.UploadAsync(new MemoryStream(Data));
return true;
}
public async Task<Result<bool>> FileExistsAsync(string containerName, string filename)
{
var containerResult = GetContainer(containerName);
if (!containerResult.IsSuccessful)
{
return new Result<bool>(containerResult.Error ?? new Exception("Container not found"));
}
var container = containerResult.Value;
var blobClient = container.GetBlobClient(filename);
var response = await blobClient.ExistsAsync();
return response.Value;
}
public async Task<Result<byte[]>> GetBlobAsync(string containername, string filename)
{
var containerResult = GetContainer(containername);
if (!containerResult.IsSuccessful)
{
return new Result<byte[]>(
containerResult.Error ?? new Exception("Container not found"));
}
var container = containerResult.Value;
var blobClient = container.GetBlobClient(filename);
var response = await blobClient.DownloadAsync();
using var memoryStream = new MemoryStream();
await response.Value.Content.CopyToAsync(memoryStream);
return memoryStream.ToArray();
}
public Result<BlobContainerClient> GetContainer(string container)
{
try
{
return new BlobContainerClient(configuration.ConnectionString, container);
}
catch (Exception e)
{
return new Result<BlobContainerClient>(e);
}
}
}
After implementing Railway Oriented Programming
public class BlobAccessor(BlobConfiguration configuration)
{
public async Task<ResultBox<UnitValue>> SaveBlobAsync(
string containerName,
string filename,
byte[] data)
=> await GetContainer(containerName)
.ConveyorWrapTry(container => container.GetBlobClient(filename))
.ConveyorWrapTry(
async blobClient => await blobClient.UploadAsync(new MemoryStream(data)))
.Remap(_ => UnitValue.None); // エラーを返さなかったらtrue 成功
public async Task<ResultBox<bool>> FileExistsAsync(string containerName, string filename)
=> await GetContainer(containerName)
.ConveyorWrapTry(container => container.GetBlobClient(filename))
.ConveyorWrapTry(async blobClient => await blobClient.ExistsAsync())
.Remap(response => response.Value);
public async Task<ResultBox<byte[]>> GetBlobAsync(string containername, string filename)
=> await GetContainer(containername)
.ConveyorWrapTry(container => container.GetBlobClient(filename))
.ConveyorWrapTry(async blobClient => await blobClient.DownloadAsync())
.ConveyorWrapTry(
async response =>
{
using var memoryStream = new MemoryStream();
await response.Value.Content.CopyToAsync(memoryStream);
return memoryStream.ToArray();
});
public ResultBox<BlobContainerClient> GetContainer(string container)
=> ResultBox.WrapTry(
() => new BlobContainerClient(configuration.ConnectionString, container));
}
What was the change?
Various changes were made, and I found using Railway Oriented Programming with ResultBox to be convenient for the following reasons:
Shorter code (64 -> 39 lines): Shorter code means less to look at, which is beneficial.
Simplified error handling: Errors are generally propagated to the parent, allowing you to decide what to do at the very end.
Ability to validate values passed to the next step: In regular functions, temporary variables defined within the function can be used until the end, increasing the risk of accidental use. In Railway Oriented Programming, the returned value is passed to the next step. For handling complex cases, the
Combine()
method returns two values: the received and the returned ones. This ensures only the necessary values are passed and used, reducing the risk of accidental misuse.Focus on the task at hand: Each process is a function within ResultBox, allowing for easy function splitting or direct writing with attention to inputs and outputs, thereby reducing cognitive load.
Ease of inserting or repeating processes: Each process is a
ResultBox<TValueType>
, making it easy to insert or repeat processes by simply adding a line.Ease of mixing sync/async code: ResultBox allows for seamless connection and execution of async (Task) and sync code, simplifying the writing process and allowing for various coding styles within a function.
In this way, I found it easier to write clear and understandable programs in several aspects.
Integration of CQRS and Railway Oriented Programming
As we implemented Railway Oriented Programming, we felt the desire to introduce it in various areas, especially in our event-sourcing CQRS framework, Sekiban.
https://zenn.dev/jtechjapan_pub/articles/aac3e1a89701d1
In the article above, I summarized the evolution of coding styles in Sekiban. Sekiban is an event-sourcing framework we use internally, but since it is released as open-source, we have continuously thought about how to write code efficiently. In CQRS, access to the domain is generally limited to commands
and queries
. Commands are used for saving data, and queries are used for retrieving data. One of the considerations in Sekiban is how to make these commands and queries as clear and easy to write as possible.
Impact of Commands, Queries, and DI
We have made various improvements, and until now, commands and queries were defined with the following class structure:
Command
Command data class
Command handler class (requires DI)
Query
Query parameter data class
Query handler class (requires DI)
Query response data class
For handler classes that require DI, we used the conventional method in C# of constructor injection for dependency injection. However, while making improvements using the Result class, we realized that handler classes might be unnecessary. Instead of using the handler class constructor for DI, we adopted a method of retrieving DI classes from the context passed to the handler function. This eliminated the need to define handler classes and allowed us to define handler methods directly within the data classes. Therefore, the class structure can be simplified as follows:
Command
- Command data class + command handler method (with DI)
Query
Query parameter data class + query handler method (with DI)
Query response data class
Here is the code.
Before
public class GeneralListQuerySample : IGeneralListQuery<GeneralListQuerySample.Parameter, GeneralListQuerySample_Response>
{
private readonly IMultiProjectionService _multiProjectionService;
public GeneralListQuerySample(IMultiProjectionService multiProjectionService) => _multiProjectionService = multiProjectionService;
public async Task<IEnumerable<GeneralListQuerySample_Response>> HandleFilterAsync(Parameter queryParam)
{
var projectionA = await _multiProjectionService.GetMultiProjectionAsync<ClientLoyaltyPointListProjection>();
var clients = await _multiProjectionService.GetAggregateList<Client>();
return projectionA.Payload.Records.Join(clients, x => x.ClientId, x => x.AggregateId, (x, y) => new { x, y })
.Where(x => x.y.Payload.ClientEmail.Contains(queryParam.EmailContains))
.Select(x => new GeneralListQuerySample_Response(x.x.ClientName, x.x.BranchName));
}
public IEnumerable<GeneralListQuerySample_Response> HandleSort(Parameter queryParam, IEnumerable<GeneralListQuerySample_Response> filteredList) =>
filteredList.OrderBy(x => x.Name);
public record Parameter(string EmailContains) : IListQueryParameter<GeneralListQuerySample_Response>;
}
After
public class GeneralListQuerySampleNext(string EmailContains) : INextGeneralListQueryAsync<GeneralListQuerySample_Response>
{
public Task<ResultBox<IEnumerable<GeneralListQuerySample_Response>>> HandleFilterAsync(IQueryContext context) =>
context.GetMultiProjectionService()
.Conveyor(
service => service.GetMultiProjectionWithResultAsync<ClientLoyaltyPointListProjection>()
.Combine(_ => service.GetAggregateListWithResult<Client>()))
.Remap(
(projectionA, clients) => projectionA.Payload.Records.Join(clients, x => x.ClientId, x => x.AggregateId, (x, y) => new { x, y })
.Where(x => x.y.Payload.ClientEmail.Contains(EmailContains))
.Select(x => new GeneralListQuerySample_Response(x.x.ClientName, x.x.BranchName)));
public Task<ResultBox<IEnumerable<GeneralListQuerySample_Response>>> HandleSortAsync(
IEnumerable<GeneralListQuerySample_Response> filteredList,
IQueryContext context) =>
ResultBox.WrapTry(() => filteredList.OrderBy(x => x.Name).AsEnumerable()).ToTask();
}
Usage
ResultBox.Start
.Conveyor(
_ => ResultBox.FromValue(GetQueryResponse(new GeneralQuerySample.Parameter("test")))
.Remap(respose => respose.Count)
.Combine(_ => ResultBox.FromValue(GetQueryResponse(new GeneralQuerySampleNext("test")))))
.Scan(Assert.Equal)
.Conveyor(
_ => ResultBox.FromValue(GetQueryResponse(new GeneralQuerySample.Parameter("example.com")))
.Remap(respose => respose.Count)
.Combine(_ => ResultBox.FromValue(GetQueryResponse(new GeneralQuerySampleNext("example.com")))))
.Scan(Assert.Equal)
.UnwrapBox();
By eliminating the handler class, you can directly write the required handlers within the data class (record) by describing the necessary interfaces for commands and queries. Additionally, since dependency injection is no longer done through classes, there's no need to register them in the DI service collection, making it easier to use.
Simplifying Domain Access with CQRS
In Sekiban, commands can only modify a single aggregate in event sourcing. For example, a client's command cannot change multiple client aggregates; it can only have side effects (save events) on one aggregate ID among many client aggregates. This rule keeps commands simple, and in most cases, calling a single command from the front end is enough to achieve the desired outcome.
In scenarios where multiple aggregate types or multiple aggregates are affected, a "process manager" use case is written to handle multiple commands through an API. Otherwise, commands are directly invoked from the front end to access the backend domain, eliminating the need for creating additional layers. When using Sekiban, the feature that automatically converts commands to APIs allows direct access to Sekiban commands, while process managers are connected via APIs and controllers, with the front end accessing these APIs.
For queries, Sekiban defines live projections and writes queries against them. Queries are automatically generated into APIs by Sekiban, allowing front-end access without writing additional layers.
This concept is similar to Vertical Slice Architecture, where changes to aggregates can be made without worrying about layers or refactoring, allowing direct writing of commands and queries. In this method, commands and queries must be directly tied to use cases. Instead of CRUD operations that input/output all table items, it requires designing events to save only the necessary data changes corresponding to business events. Event sourcing is inherently structured this way, making it a natural fit with CQRS.
Example of Writing Events and Queries Using Sekiban
Command
To create a Client, the following validations must be passed:
BranchId - Required
ClientName - Required, must be within 30 characters
ClientEmailAddress - Required, must be in a valid email format
The BranchId must be searched, and the Branch must exist and be active
The ClientEmailAddress must not already exist
When the above conditions are met, a ClientCreated event is generated
public record CreateClientWithResult(
[property: Required]
Guid BranchId,
[property: Required, MaxLength(30)]
string ClientName,
[property: Required, EmailAddress]
string ClientEmail) : ICommandWithHandlerAsync<Client, CreateClientWithResult>
{
public Guid GetAggregateId() => Guid.NewGuid();
public static async Task<ResultBox<UnitValue>> HandleCommandAsync(CreateClientWithResult command, ICommandContext<Client> context) =>
await context.GetRequiredService<IQueryExecutor>()
.Conveyor(queryExecutor => queryExecutor.ExecuteWithResultAsync(new BranchExistsQuery.Parameter(command.BranchId)))
.Verify(
value => value.Exists
? ExceptionOrNone.None
: new SekibanAggregateNotExistsException(command.BranchId, nameof(Branch), (command as ICommandCommon).GetRootPartitionKey()))
.Conveyor(_ => context.GetRequiredService<IQueryExecutor>())
.Conveyor(
queryExecutor => queryExecutor.ExecuteWithResultAsync(
new ClientEmailExistsQuery.Parameter(command.ClientEmail)
{
RootPartitionKey = (command as ICommandCommon).GetRootPartitionKey()
}))
.Verify(response => response.Exists ? new SekibanEmailAlreadyRegistered() : ExceptionOrNone.None)
.Conveyor(_ => context.AppendEvent(new ClientCreated(command.BranchId, command.ClientName, command.ClientEmail)));
}
Query
A query to return the client list
Only return clients whose names match the NameFilter using a LIKE search
Although not used here, paging functionality can also be added
public record GetClientPayloadQueryNext(string NameFilter) : INextAggregateListQuery<Client, GetClientPayloadQuery_Response>
{
public ResultBox<IEnumerable<GetClientPayloadQuery_Response>> HandleFilter(IEnumerable<AggregateState<Client>> list, IQueryContext context) =>
ResultBox.WrapTry(
() => list.Where(m => m.Payload.ClientName.Contains(NameFilter))
.Select(m => new GetClientPayloadQuery_Response(m.Payload, m.AggregateId, m.Version)));
public ResultBox<IEnumerable<GetClientPayloadQuery_Response>> HandleSort(
IEnumerable<GetClientPayloadQuery_Response> filteredList,
IQueryContext context) =>
ResultBox.WrapTry(() => filteredList.OrderBy(m => m.Client.ClientName).AsEnumerable());
}
Sekiban is open source and available for anyone to use from the following repository:
Even if you don’t use C#, we would greatly appreciate it if you could star the repository on GitHub.
Summary
We have been exploring various ways to write Railway Oriented Programming and functional programming, and it seems that we are now solving problems more simply using methods we wouldn't have considered a few years ago. One challenge is proposing this approach to internal team members who are not yet accustomed to functional programming. However, I plan to gradually improve everyone’s understanding by explaining the benefits to capable members first and documenting my current understanding in this way.
Whether we delve further into functional programming or realize its limitations and revert to procedural code, I plan to continue documenting our progress.
Subscribe to my newsletter
Read articles from Tomohisa Takaoka directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Tomohisa Takaoka
Tomohisa Takaoka
CTO of J-Tech Creations, Inc. Recently working on the development of the event sourcing and CQRS framework Sekiban. Enthusiast of DIY keyboards and trackballs.