Loading...
Skip to Content

Blog

Post: Processing commands with Hangfire and MediatR

Processing commands with Hangfire and MediatR

Introduction

In the previous post about processing multiple instance aggregates of the same type I suggested to consider using eventual consistency approach. In this post I would like to present one way to do this.

Setup

In the beginning let me introduce stack of technologies/patterns:

  1. Command pattern - I am using commands but they do not look like theses described in GoF book. They just simple classes with data and they implement IRequest marker interface of MediatR.
public class RecalculateCustomerDiscountCommand : IRequest
{
    public int CustomerId { get; private set; }

    public RecalculateCustomerDiscountCommand(int customerId)
    {
        this.CustomerId = customerId;
    }
}
  1. Mediator pattern. I am using this pattern because I want to decouple my clients classes (commands invokers) from commands handlers. Simple but great library created by Jimmy Bogard named MediatR implements this pattern very well. Here is simple usage.
// Simple MediatR usage - send requests
public class ClientClass
{
    private readonly IMediator mediator;

    public ClientClass(IMediator mediator)
    {
        this.mediator = mediator;
    }

    public async Task RecalculateCustomerDiscounts(List<int> customerIds)
    {
        foreach (int customerId in customerIds)
        {
            await mediator.Send(new RecalculateCustomerDiscountCommand(customerId));
        }
    }
}

And handler:

// MediatR simple handler
public class RecalculateCustomerDiscountHandler : IAsyncRequestHandler<RecalculateCustomerDiscountCommand>
{
    public Task Handle(RecalculateCustomerDiscountCommand command)
    {
        // Get customer aggregate, processing.
    }
}
  1. Hangfire. Great open-source library for processing and scheduling background jobs even with GUI monitoring interface. This is where my commands are scheduled, executed and retried if error occured.

Problem

For some of my uses cases, I would like to schedule processing my commands, execute them parallel with retry option and monitor them. Hangfire gives me all these kind of features but I have to have public method which I have to pass to Hangifre method (for example BackgroundJob.Enqueue). This is a problem - with mediator pattern I cannot (and I do not want) pass public method of handler because I have decoupled it from invoker. So I need special way to integrate MediatR with Hangfire without affecting basic assumptions.

Solution

My solution is to have three additional classes:

  1. CommandsScheduler - serializes commands and sends them to Hangfire.
// CommandsScheduler
public class CommandsScheduler
{
    private readonly CommandsExecutor commandsExecutor;

    public CommandsScheduler(CommandsExecutor commandsExecutor)
    {
        this.commandsExecutor = commandsExecutor;
    }

    public string SendNow(IRequest request, string description = null)
    {
        var mediatorSerializedObject = this.SerializeObject(request, description);

        return BackgroundJob.Enqueue(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject));
    }

    public string SendNow(IRequest request, string parentJobId, JobContinuationOptions continuationOption, string description = null)
    {
        var mediatorSerializedObject = this.SerializeObject(request, description);
        return BackgroundJob.ContinueWith(parentJobId, () => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), continuationOption);
    }

    public void Schedule(IRequest request, DateTimeOffset scheduleAt, string description = null)
    {
        var mediatorSerializedObject = this.SerializeObject(request, description);

        BackgroundJob.Schedule(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), scheduleAt);
    }
    public void Schedule(IRequest request, TimeSpan delay, string description = null)
    {
        var mediatorSerializedObject = this.SerializeObject(request, description);
        var newTime = DateTime.Now + delay;
        BackgroundJob.Schedule(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), newTime);
    }

    public void ScheduleRecurring(IRequest request, string name, string cronExpression, string description = null)
    {
        var mediatorSerializedObject = this.SerializeObject(request, description);

        RecurringJob.AddOrUpdate(name, () => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), cronExpression, TimeZoneInfo.Local);
    }

    private MediatorSerializedObject SerializeObject(object mediatorObject, string description)
    {
        string fullTypeName = mediatorObject.GetType().FullName;
        string data = JsonConvert.SerializeObject(mediatorObject, new JsonSerializerSettings
        {
            Formatting = Formatting.None,
            ContractResolver = new PrivateJsonDefaultContractResolver()
        });

        return new MediatorSerializedObject(fullTypeName, data, description);
    }
}
  1. CommandsExecutor - responods to Hangfire jobs execution, deserializes commands and sends them to handlers using MediatR.
// CommandsExecutor
public class CommandsExecutor
{
    private readonly IMediator mediator;
    
    public CommandsExecutor(IMediator mediator)
    {
        this.mediator = mediator;
    }

    [DisplayName("Processing command {0}")]
    public Task ExecuteCommand(MediatorSerializedObject mediatorSerializedObject)
    {
        var type = Assembly.GetAssembly(typeof(RecalculateCustomerDiscountCommand)).GetType(mediatorSerializedObject.FullTypeName);

        if (type != null)
        {
            dynamic req = JsonConvert.DeserializeObject(mediatorSerializedObject.Data, type);

            return this.mediator.Send(req as IRequest);
        }

        return null;
    }
}
  1. MediatorSerializedObject - wrapper class for serialized/deserialized commands with additional properties - command type and additional description.
// MediatorSerializedObject
public class MediatorSerializedObject
{
    public string FullTypeName { get; private set; }

    public string Data { get; private set; }

    public string AdditionalDescription { get; private set; }

    public MediatorSerializedObject(string fullTypeName, string data, string additionalDescription)
    {
        this.FullTypeName = fullTypeName;
        this.Data = data;
        this.AdditionalDescription = additionalDescription;
    }

    /// <summary>
    /// Override for Hangfire dashboard display.
    /// </summary>
    /// <returns></returns>
    public override string ToString()
    {
        var commandName = this.FullTypeName.Split('.').Last();
        return $"{commandName} {this.AdditionalDescription}";
    }
}

Finally with this implementation we can change our client clasess to use CommandsScheduler:

// ClientClass uses CommandScheduler
public class ClientClass
{
    private readonly CommandsScheduler commandsScheduler;

    public ClientClass(CommandsScheduler commandsScheduler)
    {
        this.commandsScheduler = commandsScheduler;
    }

    public void RecalculateCustomerDiscounts(List<int> customerIds)
    {
        foreach (int customerId in customerIds)
        {
            commandsScheduler.SendNow(new RecalculateCustomerDiscountCommand(customerId));
        }
    }
}

And our commands are scheduled, invoked and monitored by Hangfire. I sketched sequence diagram which shows this interaction:

Processing commands with MediatR and Hanfire

Processing commands with MediatR and Hanfire

Additionally, we can introduce interface for CommandsScheduler - ICommandsScheduler. Second implementation will not use Hangfire at all and only will execute MediatR requests directly - for example in development process when we do not want start Hangfire Server.

Summary

I presented the way of processing commands asynchronously using MediatR and Hangfire. With this approach we have:

  1. Decoupled invokers and handlers of commands.
  2. Scheduling commands mechanism.
  3. Invoker and handler of command may be other processes.
  4. Commands execution monitoring.
  5. Commands execution retries mechanism.

These benefits are very important during development using eventual consistency approach. We have more control over commands processing and we can react quickly if problem will appear.

Image credits: storyset on Freepik.

Comments

Related posts See all blog posts

Processing multiple aggregates - transactional vs eventual consistency
26 September 2018
When we use Domain Driven Design approach in our application, sometimes we have to invoke some method on multiple instances of aggregate of the same type.
Read More
Simple CQRS implementation with raw SQL and DDD
4 February 2019
In this post I wanted to show you how you can quickly implement simple REST API application with CQRS using the .NET Core.
Read More
The Outbox Pattern
11 March 2019
Sometimes, when processing a business operation, you need to communicate with an external component in the Fire-and-forget mode. The question that arises is whether we are able to guarantee the atomicity of our business operation from a technical point of view?
Read More