Implementing the Outbox Pattern Using Hangfire and MediatR CQRS Handlers

Dev Diary

Applications typically don`t operate in their isolated bubble. Almost all applications need to communicate with one or many other services. Here are some common interactions with 3rd party services which we typically see:

Autor
David Roth
Datum
19. Mai 2021
Lesedauer
10 Minuten
  • Sending emails or SMS (e.g registration, TFA, …)
  • Processing money via payment gateways (e.g stripe)
  • Notifying other services and or calling into other services (e.g some internal ERP system)
  • Publishing integration events via a message broker technology (e.g. RabbitMQ)
  • Syncing data to some data/warehouse for bi analysis
  • Background processing of (compute intensive) data

What do they have in common? Each call to an external system/service should happen reliably, but it is usually neither required nor technically possible to call the other service atomically within the current business transaction. As a result, when not handled properly, lots of subtle issues can occur, which might lead to an eventual inconsistent system.

To get an idea of what can go wrong even for the most basic operations, the following code snippet demonstrates a badly designed user registration handler. The handler contains the logic for registering a user and also notifying the user via email. I will demonstrate some common issues, which might happen when executing such a simple piece of code:

public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IEmailSender emailSender) { .. } public async Task HandleAsync(RegisterUser command) { var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await emailSender.SendAsync(command.Email, subject: "Welcome", body: "Welcome text"); } } public class SmtpEmailSender : IEmailSender { public SmtpEmailSender () { .. } public async Task HandleAsync(string email, string subject, string body) { using var client = new SmtpClient(...); var smtpMessage = new MailMessage { Subject = subject, Body = body, To = { email } }; await client.SendAsync(mailMessage); } } // Composition root (Startup.cs) // Simple Injector: container.Register<IEmailSender, SmtpEmailSender>();

What’s wrong with the code above? Well, it has some serious reliability/consistency problems. Namely, there is no guarantee that both the user gets registered and the email is sent. To be more precise, the following issues can occur:

  1. The happy path:
    Both SaveChanges() and emailSender.SendAsync(..) succeed.
    User gets registered and receives an email. All good 
  2. Consistent error path:
    SaveChanges() fails.
    The user is not registered and no email is sent.
    This is not good for the user experience because an error will appear. At least we return to a consistent state.
  3. Inconsistent/problematic error path:
    SaveChanges() succeeds, but emailSender.SendAsync(..) fails.
    The user gets registered but does not receive the welcome email.
  4. Transaction rollback path:
    SaveChanges() succeeds, but the outer transaction of the request rolls back.
    The user is not registered but still receives a registration email.
  5. Slow/timeout error path:
    The email gateway used in IMailSender may be very slow or even down.
    If it is slow, the registration request might take several seconds leading to a bad user experience.
    If it fails (timeout), we can get into the inconsistency/problem error path (3).

A simplified graphical overview of the above workflow might look like this:

Graphik - Implementing_the_outbox.1

Distributed transactions (2 phase commit) are not the solution

We cannot put the entire world into a transaction. Although distributed transactions (e.g. via MSDTC) were quite commonly used in internal enterprise application landscapes, when designing scalable cloud web applications, distributed transactions usually are not available. Instead, when interacting with external services, it is important to realize that you have to live with the lack of transactions and design for a resilient, eventually consistent system instead. So if 2-phase-commit is not an option, how can we do better? Let’s have a look at the outbox pattern.

Introducing the outbox pattern

The outbox pattern, as its name suggests, introduces the concept of an outbox, acting as a queue in which messages/tasks are being scheduled for further execution in the future. When applying the outbox pattern in a system with a relational database, the outbox is usually modeled as a simple table acting as a queue. This outbox table needs to contain at least the following columns:

  • Message (JSON)
  • Date (DateTime)
  • Executed (Boolean)

There is an important detail of the outbox pattern in the relational context: The outbox table lives side by side next to your normal business tables in the same database. This allows saving both the business data as well as the events in one atomic transaction (all or nothing).

Graphik - Implementing_the_outbox.2

In the graphic above, you see how the registration process looks like when using an outbox. Instead of sending the email immediately, the task to send an email gets enqueued in the outbox. Because the outbox lives in the same database, both registering the user as well as enqueuing the outbox message is stored atomically in the same ACID transaction. After step 1 is completed, an outbox processor will pick up the enqueued message asynchronously and will process it (2).

Luckily, you don’t need to build an outbox queuing system from scratch. Instead, you can integrate the Hangfire job queue which can handle job processing reliably. Hangfire is an open-source background job processing component, which can be used as our outbox.

 

Implementing the outbox pattern using Hangfire

To use Hangfire as an outbox implementation, we need to call Hangfires BackgroundJob API to store the email sending method in Hangfires job queue as a task. In my previous example, I already declared an IEmailSender abstraction which allows us to substitute the implementation. So the next steps are very easy. We just need to define a custom IEmailSender implementation, which delegates the real Email sending invocation to Hangfires job queue.

In the composition root, we need to register the HangfireEmailSender and we also need to register the SmtpEmailSender so that we can call our specific email sender when the outbox is processed.

public class HangfireEmailSender : IEmailSender { public HangfireEmailSender (IBackgroundJobClient jobClient) { .. } public async Task HandleAsync(string email, string subject, string body) { // The Hangfire api creates a job from the following expression and puts it in the Hangfire job queue. jobClient.Enqueue<SmtpEmailSender>(x => x.SendAsync(email, subject, body)); } } public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IEmailSender emailSender) { .. } public async Task HandleAsync(RegisterUser command) { // Note for the blog reader: Ambient transaction can be applied via middleware, mediator pipeline or handler decorator. var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await emailSender.SendAsync(command.Email, subject: "Welcome", body: "Welcome text"); } } // Composition root (Startup.cs) // Simple Injector: container.Register<IEmailSender, HangfireEmailSender>(); container.Register<SmtpEmailSender, SmtpEmailSender>();

Now, instead of sending our email synchronously in the code-flow of the register email command, we put a task in the Hangfire queue. Once the ambient transaction is committed, a Hangfire worker will pick up the task and execute it in the background. This way the email gets sent independently. Now all four error cases listed above are resolved:

  1. The happy path:
    Both SaveChanges() and emailSender.SendAsync succeed.
    User gets registered and receives an email as soon as Hangfire picks up the enqueued job. All good! 
  2. Consistent error path:
    SaveChanges() fails.
    The outer transaction rolls back. Neither the user nor the email job are stored.
  3. Inconsistent/problematic error path:
    SaveChanges() succeeds, but emailSender.SendAsync (HangfireEmailSender) fails.
    The outer transaction rolls back. Neither the user nor the email job are stored.
  4. SaveChanges() succeeds, but the outer transaction of the request rolls back
    Neither the user nor the email job gets stored.
  5. Slow/timeout error path:
    No impact on registration performance. The job queue will process slower, but eventually, all emails will get sent.

Notice that we had to define a specific HangfireEmailSender class to delay the email sending logic. This is a legitimate and easy step for a single-use case. But as we continue adding calls to external systems, we have to declare such Hangfire-specific intermediates over and over again.
Furthermore, in real-world scenarios, you often need to capture the current culture and principal, so that the background worker executes using the original culture + principle.

We can do a lot better. That’s where I will introduce you to the generic “OutOfBand” approach using MediatR CQRS handlers.

The final solution: Using CQRS decorators to implement a reusable outbox

Because we are already using CQRS style handlers it is possible to apply additional behavior on top of a handler using decorators.
By introducing an [OutOfBand] attribute, we can mark handlers that should run outside of the current flow. Whenever such a handler gets resolved, the DI containers resolve pipeline searches if such an attribute exists on the handler. If the attribute is defined, a decorator enqueues the handler in the outbox instead of executing it. Sending the message will be stored as a part of the current ambient ACID transaction. It gets scheduled for async background processing as soon as the transaction completes.

That means that the message will be stored in the outbox as part of the current ACID transaction and so it gets scheduled for async background processing as soon as the current transaction completes.

Using this approach we can atomically perform our business operation including scheduling commands/events which will asynchronously execute eventually. Note that from now on, I am using several packages from our open-source Fusonic.Extensions project which provides stable components for applying this OutOfBand functionality:

public record SendEmail(string receiver, string subject, string body) : ICommand; [OutOfBand] // ← Mark handler as out of band public class SendEmailHandler : ICommandHandler<SendEmail> { public SendEmailHandler(IEmailSender) { ... } public async Task HandleAsync(SendEmail command) { await emailSender.SendAsync(command.Email, command.Subject, command.Body); } } public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IRequestHandler<SendEmail> sendEmailHandler) { .. } public async Task HandleAsync(RegisterUser command) { var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await sendEmailHandler.HandleAsync(command.Email, subject: "Welcome", body: "Welcome text"); } } // Composition root (Startup.cs) // Extension is from Fusonic.Extensions.Hangfire: https://github.com/fusonic/dotnet-extensions/blob/master/docs/Hangfire/README.md container.RegisterOutOfBandDecorators(); // For the ambient transaction: container.RegisterSingleton<ITransactionScopeHandler, TransactionScopeHandler>(); container.RegisterDecorator(typeof(IRequestHandler<,>), typeof(TransactionalRequestHandlerDecorator<,>)); container.RegisterDecorator(typeof(INotificationHandler<>), typeof(TransactionalNotificationHandlerDecorator<>));

As you can see in the above code, I changed several aspects compared to the previous implementation.

  1. I introduced a new SendEmail command and the corresponding SendEmailHandler command handler.
  2. I annotated the SendEmailHandler with the OutOfBand attribute.
  3. I changed the composition root and used the RegisterOutOfBandDecorators helper method which registers the needed out of band decorators from our extensions library.

What if you don’t want to explicitly call the SendEmailCommand but instead use a more reactive, event-based mechanism? Of course, that’s also possible:

public record UserRegistered(int Id, string Forename, string Surname, string Email) : INotification; [OutOfBand] // ← Mark handler as out of band public class UserRegisteredSendEmail : INotificationHandler<UserRegistered> { public async Task HandleAsync(UserRegistered notification) { await emailSender.SendAsync(notification.Email, "Your registration", "Thanks for registering"); } } public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IMediator mediator) { .. } public async Task HandleAsync(RegisterUser command) { var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await mediator.Publish(new UserRegistered(user.Id, request.Email, request.Forename, request.Surname)); } } // Composition root (Startup.cs) // Simple Injector: container.RegisterOutOfBandDecorators(); // Extension is from Fusonic.Extensions.Hangfire: https://github.com/fusonic/dotnet-extensions/blob/master/docs/Hangfire/README.md // ...

The OutOfBand attribute can be applied to all void-returning request/notification handlers. You cannot apply it to request handlers, which return values.

Well, that looks nice and simple, right? :-) Now we have everything in place to easily mark any command or notification handler with the OutOfBand attribute. We can further simplify our code by removing our IEmailSender abstraction altogether. Because MediatR handlers already implement a generic IRequestHandler<> / INotificationHandler<> abstraction, we can already replace/swap/mock them easily in unit tests, without needing additional abstractions.

public record UserRegistered(int Id, string Forename, string Surname, string Email) : INotification; [OutOfBand] // ← Mark handler as out of band public class UserRegisteredSendEmail : INotificationHandler<UserRegistered> { public async Task HandleAsync(UserRegistered notification) { using var client = new SmtpClient(...); // ... stripped for brevity await client.SendAsync(mailMessage); } } public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IMediator mediator) { .. } public async Task HandleAsync(RegisterUser command) { var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await mediator.Publish(new UserRegistered(user.Id, request.Email, request.Forename, request.Surname)); } } // Composition root (Startup.cs) // Simple Injector: container.RegisterOutOfBandDecorators(); // Extension is from Fusonic.Extensions.Hangfire: https://github.com/fusonic/dotnet-extensions/blob/master/docs/Hangfire/README.md // ...

Or the same using an explicit command handler:

public record SendEmail(string receiver, string subject, string body) : ICommand; [OutOfBand] // ← Mark handler as out of band public class SendEmailHandler : ICommandHandler<SendEmail> { public SendEmailHandler() { ... } public async Task HandleAsync(SendEmail command) { using var client = new SmtpClient(...); // ... stripped for brevity await client.SendAsync(mailMessage); } } public class RegisterUserHandler : IRequestHandler<RegisterUser> { public RegisterUserHandler(UserContext dbContext, IRequestHandler<SendEmail> sendEmailHandler) { .. } public async Task HandleAsync(RegisterUser command, IRequestHandler<SendEmail> sendEmailHandler) { var user = new User(request.Forename, request.Surname, request.Email); await dbContext.AddAsync(user); await dbContext.SaveChangesAsync(); await sendEmailHandler.HandleAsync(command.Email, subject: "Welcome", body: "Welcome text"); } } // Composition root (Startup.cs) container.RegisterOutOfBandDecorators(); // Extension is from Fusonic.Extensions.Hangfire: https://github.com/fusonic/dotnet-extensions/blob/master/docs/Hangfire/README.md // ...

When everything is set up properly, the hangfire dashboard will show the enqueued/succeeded/failed jobs in its dashboard:

hangfire

Wrapping it up

Implementing the outbox pattern with CQRS-Style request/notification handler decorators on top of Hangfire is a nice and easy solution for solving these common service-to-service communication issues. Hangfire is a reliable and powerful job-queue and ships with lots of out-of-the-box features like retries, error-queues, topic queues, and so on. Using our open-source Fusonic.Extensions.Hangfire integration package, you can easily plug a reliable outbox solution into your system. If you are interested in learning more, I have published a small sample application on Github, which demonstrates all the features described in the previous sections.

If you like the solution or have any feedback, feel free to contact me on Twitter. 😉

Mehr davon?

From request to typed objects
Dev Diary
From requests to typed objects
5. August 2021 | 4 Min.
Parsing_csv file the right way in php
Dev Diary
Parsing csv files the right way in php 8
7. April 2021 | 2 Min.

Kontaktformular

*Pflichtfeld
*Pflichtfeld
*Pflichtfeld
*Pflichtfeld

Wir schützen deine Daten

Wir bewahren deine persönlichen Daten sicher auf und geben sie nicht an Dritte weiter. Mehr dazu erfährst du in unseren Datenschutzbestimmungen.