Processing commands with Hangfire and MediatR

In previous post about processing multiple instance aggregates of the same type I suggested to consider using eventual consistency approach. In this post I would like to present one way to do this.

Setup

In the beginning let me introduce stack of technologies/patterns:
1. Command pattern – I am using commands but they do not look like theses described in GoF book. They just simple classes with data and they implement IRequest  marker interface of MediatR.

2. Mediator pattern. I am using this pattern because i want to decouple my clients classes (commands invokers) from commands handlers. Simple but great library created by Jimmy Bogard named MediatR implements this pattern very well. Here is simple usage.

and handler:

3. Hangfire. Great open-source library for processing and scheduling background jobs even with GUI monitoring interface. This is where my commands are scheduled, executed and retried if error occured.

Problem

For some of my uses cases, I would like to schedule processing my commands, execute them parallel with retry option and monitor them. Hangfire gives me all these kind of features but I have to have public method which I have to pass to Hangifre method (for example BackgroundJob.Enqueue). This is a problem – with mediator pattern I cannot (and I do not want) pass public method of handler because I have decoupled it from invoker. So I need special way to integrate MediatR with Hangfire without affecting basic assumptions.

Solution

My solution is to have three additional classes:
1. CommandsScheduler – serializes commands and sends them to Hangfire.

2. CommandsExecutor – responods to Hangfire jobs execution, deserializes commands and sends them to handlers using MediatR.

3. MediatorSerializedObject – wrapper class for serialized/deserialized commands with additional properties – command type and additional description.

Finally with this implementation we can change our client clasess to use CommandsScheduler:

and our commands are scheduled, invoked and monitored by Hangfire. I sketched sequence diagram which shows this interaction:

Processing commands with MediatR and Hanfire

Additionally, we can introduce interface for CommandsSchedulerICommandsScheduler. Second implementation will not use Hangfire at all and only will execute MediatR requests directly – for example in development process when we do not want start Hangfire Server.

Summary

I presented the way of processing commands asynchronously using MediatR and Hangfire. With this approach we have:
1. Decoupled invokers and handlers of commands.
2. Scheduling commands mechanism.
3. Invoker and handler of command may be other processes.
4. Commands execution monitoring.
5. Commands execution retries mechanism.

These benefits are very important during development using eventual consistency approach. We have more control over commands processing and we can react quickly if problem will appear.

Processing multiple aggregates – transactional vs eventual consistency

When we use Domain Driven Design approach in our application, sometimes we have to invoke some method on multiple instances of aggregate of the same type.

For example, in our domain we have customers and when big Black Friday campaign starts we have to recalculate theirs discounts. So in domain model exists Customer aggregate with RecalculateDiscount method and in Application Layer we have DiscountAppService which is responsible for this use case.

There are 2 ways to implement this and similar scenarios:

1. Using transactional consistency

This is the simplest solution, we get all customers aggregates and on every instance the RecalculateDiscount method is invoked. We surrounded our processing with TransactionScope so after that we can be certain that every customer have recalculated discount or none of them. This is transactional consistency – it provides us ACID and sometimes is enough solution, but in many cases (especially while processing multiple aggregates in DDD terms) this solution is very bad approach.

First of all, customers are loaded to memory and we can have performance issue. Of course we can change implementation a little, get only customers identifiers and in foreach loop load customers one by one. But we have worse problem – our transaction holds locks on our aggregates until end of processing and other processes have to wait. For the record – default transaction scope isolation level is Serializable. We can change isolation level but we can’t get rid of locks. In this case application becomes less responsive, we can have timeouts and deadlocks – things we should avoid how we can.

2. Using eventual consistency

In this approach we do not use big transaction. Instead of this, we process every customer aggregate separately. Eventual consistency means that in specified time our system wile be in inconsistent state, but after given time will be consistent. In our example there is a time, that some of customers have discounts recalculated and some of them not. Let’s see the code:

In this case on the beginning we got only customers identifiers and we process customer aggregates one by one asynchronously (and parallel if applicable). We removed problem of locking our aggregates for a long time. The simplest solution is usage of Task.Run() , but using this approach we totally losing control of processing. Better solution is to use some 3rd party library like Hangfire, Quartz.NET or messaging system.

Eventual consistency is a big topic used in distributed computing, encountered together with CQRS. In this article I would like to show only another way of executing batch processing using this approach and its benefits. Sometimes this approach is not a good choice – it can have impact on GUI and users may see stale data for some time. That is why it is important to talk with domain experts because often it is fine for user to wait for update of data but sometimes it is unacceptable.

Summary

Transactional consistency – whole processing is executed in one transaction. It is “all or nothing” approach and sometimes can lead to decrease performance, scalability and availability of our application.

Eventual consistency – processing is divided and not executed in one big transaction. In some time application will be in inconsistent state. It leads to better scalability and availability of application. On the other hand can cause problems with GUI (stale data) and it requires supporting mechanisms which enable parallel processing, retries and sometimes process monitors as well.