Handling concurrency – Aggregate Pattern and EF Core

Introduction

In this post I would like to discuss a frequently overlooked, though in some circumstances very important topic – concurrency handling in context of protection of the so-called Domain Invariants.

In other words, the question is the following: how to ensure that even in a multi-threaded environment we are able to always guarantee the immediate consistency for our business rules?

It’s best to describe this problem with an example…

Problem

Let’s assume that our domain has the concept of Order and Order Line. Domain expert said that:

One Order can have a maximum of 5 Order Lines.

In addition, he said that this rule must be met at all times (without exception). We can model it in the following way:

Conceptual Model
Conceptual Model

The Aggregate

One of the main goals of our system is to enforce business rules. One way to do this is to use the Domain-Driven Design tactical building block – an Aggregate. The Aggregate is a concept created to enforce business rules (invariants). Its implementation may vary depending on the paradigm we use, but In object-oriented programming, it is an object-oriented graph as Martin Fowler describes it:

A DDD aggregate is a cluster of domain objects that can be treated as a single unit.

And Eric Evans in DDD reference describes:

Use the same aggregate boundaries to govern transactions and distribution. Within an aggregate boundary, apply consistency rules synchronously

Back to the example. How can we ensure that our Order will never exceed 5 Order Lines? We must have an object that will take care of it. To do this, it must have information about the current number of Order Lines. So it must have a state and based on this state its responsibility is to decide whether invariant is broken or not.

In this case, the Order seems to be the perfect object for this. It will become root of our Aggregate, which will have Order Lines. It will be his responsibility to add the order line and check that the invariant has not been broken.

Order Aggregate
Order Aggregate

Let’s see how the implementation of such a construct may look like:

Everything’s fine right now, but this is only a static view of our model. Let’s see what the typical system flow is when invariant is not broken and when is broken:

Process of adding Order Line - one thread
Process of adding Order Line – success
Process of adding Order Line - rule broken
Process of adding Order Line – rule broken

Simplified implementation bellow:

Concurrency problem

Everything works nice and clean if we operate in a not heavy loaded environment. However, let’s see what can happen when 2 threads almost at the same time want to perform our operation:

Process of adding Order Line - business rule broken
Process of adding Order Line – business rule broken

As you can see in the diagram above, 2 threads load exactly the same aggregate at the same time. Let’s assume that the Order has 4 Order Lines. The aggregate with 4 Order Lines will be loaded in both the first and second threads. The exception will not be thrown, because of 4 < 5. Finally, depending on how we persist the aggregate, the following scenarios are possible:

a) If we have a relational database and a separate table for Order Lines then 2 Order Lines will be added giving a total of 6 Order Lines – the business rule is broken.

b) If we store aggregate in one atomic place (for example in document database as a JSON object), the second thread will override the first operation and we (and the User) won’t even know about this.

The reason for this behavior is that the second thread read data from the database (point 2.1) before the first one committed it (point 1.4.3).

Let’s see how we can solve this problem.

Solution

Pessimistic concurrency

The first way to ensure that our business rule will not be broken is to use Pessimistic concurrency. In that approach, we allow only one thread to process a given Aggregate. This leads to the fact that the processing thread must block the reading of other threads by creating a lock. Only when the lock is released, the next thread can get the object and process it.

Pessimistic concurrency
Pessimistic concurrency

The main difference from the previous approach is that the second thread waits for the previous one to finish (time between points 2.1 and 1.3). This approach causes us to lose performance because we can process transactions only one after the other. Moreover, it can lead to deadlocks.

How can we implement this behavior using EntityFramework Core and SQL server?

Unfortunately, EF Core does not support Pessimistic Concurrency. However, we can do it easily by ourselves using raw SQL and the query hint mechanism of the SQL Server engine.

Firsty, the database transaction must be set. Then the lock must be set. This can be done in two ways – either read data with query hint (XLOCK, PAGELOCK) or by updating the record at the beginning:

In this way, the transaction on the first thread will receive a Exclusive Lock (on write) and until it releases it (through the transaction commit) no other thread will be able to read this record. Of course, assuming that our queries operate in at least read committed transaction isolation level to avoid the so-called dirty reads.

Optimistic concurrency

An alternative, and most often preferred solution is to use Optimistic Concurrency. In this case, the whole process takes place without locking the data. Instead, the data in the database is versioned and during the update, it is checked – whether there was a change of version in the meantime.

Optimistic Concurrency
Optimistic Concurrency

What does the implementation of this solution look like? Most ORMs support Optimistic Concurrency out of the box. It is enough to indicate which fields should be checked when writing to the database and it will be added to the WHERE statement. If it turned out that our statement has updated 0 records, it means that the version of the record has changed and we need to do a rollback. Though often the current fields are not used to check the version and special column called “Version” or “Timestamp” is added.

Back to our example, does adding a column with a version and incrementing it every time the Order entity is changed solve the problem? Well no. The aggregate must be treated as a whole, as a boundary of transaction and consistency.

Therefore, the version incrementation must take place when we change anything in our aggregate. If we have added the Order Line and we keep it in a separate table, ORM support for optimistic concurrency will not help us because it works on updates and here there are inserts and deletes left.

How can we know that the state of our Aggregate has changed? If you follow this blog well, you already know the answer – by Domain Events. If a Domain Event was thrown from the Aggregate, it means that the state has changed and we need to increase Aggregate version.

The implementation can look like this.

First, we add a _version field to each Aggregate Root and the method to increment that version.

Secondly, we add a mapping for the version attribute and indicate that it is a EF Concurrency Token:

The last thing to do is incrementing the version if any Domain Events have been published:

With this setup, all updates will execute this statement:

For our example, for second thread no record will be updated ( @@ ROWOCOUNT = 0 ), so EntityFramework will throw the following message:

Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: Database operation expected to affect 1 row(s) but actually affected 0 row(s). Data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.

and our Aggregate will be consistent – the 6th Order Line will not be added. The business rule is not broken, mission accomplished.

Summary

In summary, the most important issues here are:

  • The Aggregate’s main task is to protect invariants (business rules, the boundary of immediate consistency)
  • In a multi-threaded environment, when multiple threads are running simultaneously on the same Aggregate, a business rule may be broken
  • A way to solve concurrency conflicts is to use Pessimistic or Optimistic concurrency techniques
  • Pessimistic Concurrency involves the use of a database transaction and a locking mechanism. In this way, requests are processed one after the other, so basically concurrency is lost and it can lead to deadlocks.
  • Optimistic Concurrency technique is based on versioning database records and checking whether the previously loaded version has not been changed by another thread.
  • Entity Framework Core supports Optimistic Concurrency. Pessimistic Concurrency is not supported
  • The Aggregate must always be treated and versioned as a single unit
  • Domain events are an indicator, that state was changed so Aggregate version should be changed as well
  • GitHub sample repository

    Especially for the needs of this article, I created a repository that shows the implementation of 3 scenarios:

    – without concurrency handling
    – with Pessimistic Concurrency handling
    – with Optimistic Concurrency handling

    Link: https://github.com/kgrzybek/efcore-concurrency-handling

    Related Posts

    Handling Domain Events: Missing Part
    Domain Model Encapsulation and PI with Entity Framework 2.2
    Attributes of Clean Domain Model

The Outbox Pattern

Introduction

Sometimes, when processing a business operation, you need to communicate with an external component in the Fire-and-forget mode. That component can be, for example:
– external service
– message bus
– mail server
– same database but different database transaction
– another database

Examples of this type of integration with external components:
– sending an e-mail message after placing an order
– sending an event about new client registration to the messaging system
– processing another DDD Aggregate in different database transaction – for example after placing an order to decrease number of products in stock

The question that arises is whether we are able to guarantee the atomicity of our business operation from a technical point of view? Unfortunately, not always, or even if we can (using 2PC protocol), this is a limitation of our system from the point of latency, throughput, scalability and availability. For details about these limitations, I invite you to read the article titled It’s Time to Move on from Two Phase Commit.

The problem I am writing about is presented below:

After execution of line 24 transaction is committed. In line 28 we want to send an event to event bus, but unfortunately 2 bad things can happen:
– our system can crash just after transaction commit and before sending the event
– event bus can be unavailable at this moment so the event cannot be sent

Outbox pattern

If we cannot provide atomicity or we don’t want to do that for the reasons mentioned above, what could we do to increase the reliability of our system? We should implement the Outbox Pattern.

Outbox pattern

The Outbox Pattern is based on Guaranteed Delivery pattern and looks as follows:

Outbox pattern

When you save data as part of one transaction, you also save messages that you later want to process as part of the same transaction. The list of messages to be processed is called an Outbox, just like in e-mail clients.

The second element of the puzzle is a separate process that periodically checks the contents of the Outbox and processes the messages. After processing each message, the message should be marked as processed to avoid resending. However, it is possible that we will not be able to mark the message as processed due to communication error with Outbox:

Outbox messages processing

In this case when connection with Outbox is recovered, the same message will be sent again. What all this means to us? Outbox Pattern gives At-Least-Once delivery. We can be sure that message will be sent once, but can be sent multiple times too! That’s why another name for this approach is Once-Or-More delivery. We should remember this and try to design receivers of our messages as Idempotents, which means:

In Messaging this concepts translates into a message that has the same effect whether it is received once or multiple times. This means that a message can safely be resent without causing any problems even if the receiver receives duplicates of the same message.

Ok, Enough theory, let’s see how we can implement this pattern in .NET world.

Implementation

Outbox message

At the beginning, we need to define the structure of our OutboxMessage:

What is important, the OutboxMessage class is part of the Infrastructure and not the Domain Model! Try to talk with business about Outbox, they will think about the outlook application instead of the messaging pattern. 🙂 I didn’t include ProcessedDate property because this class is only needed to save message as part of transaction so this property always will be NULL in this context.

Saving the message

For sure I do not want to program writing messages to the Outbox every time in each Command Handler, it is against DRY principle. For this reason, the Notification Object described in the post about publishing Domain Events can be used. Following solution is based on linked article with little modification – instead of processing the notifications immediately, it serializes them and writes them to the database.

As a reminder, all Domain Events resulting from an action are processed as part of the same transaction. If the Domain Event should be processed outside of the ongoing transaction, you should define a Notification Object for it. This is the object which should be written to the Outbox. The code looks like:

Example of Domain Event:

And Notification Object:

First thing to note is Json.NET library usage. Second thing to note are 2 constructors of CustomerRegisteredNotification class. First of them is for creating notification based on Domain Event. Second of them is to deserialize message from JSON string which is presented in following section regarding processing.

Processing the message

The processing of Outbox messages should take place in a separate process. However, instead of a separate process, we can also use the same process but another thread depending on the needs. Solution which is presented below can be used in both cases.

At the beginning, we need to use a scheduler that will periodically run Outbox processing. I do not want to create the scheduler myself (it is known and solved problem) so I will use one the mature solution in .NET – Quartz.NET. Configuration of Quartz scheduler is very simple:

Firstly, scheduler is created using factory. Then, new instance of IoC container for resolving dependencies is created. The last thing to do is to configure our job execution schedule. In case above it will be executed every 15 seconds but its configuration really depends on how many messages you will have in your system.

This is how ProcessOutboxJob looks like:

The most important parts are:
Line 1 – [DisallowConcurrentExecution] attribute means that scheduler will not start new instance of job if other instance of that job is running. This is important because we don’t want process Outbox concurrently.
Line 25 – Get all messages to process
Line 30 – Deserialize message to Notification Object
Line 32 – Processing the Notification Object (for example sending event to bus)
Line 38 – Set message as processed

As I wrote earlier, if there is an error between processing the message (line 32) and setting it as processed (line 38), job in the next iteration will want to process it again.

Notification handler template looks like this:

Finally, this is view of our Outbox:

Outbox view

Summary

In this post I described what are the problems with ensuring the atomicity of the transaction during business operation processing. I’ve raised the topic of 2PC protocol and motivation to not use it. I presented what the Outbox Pattern is and how we can implement it. Thanks to this, our system can be really more reliable.

Source code

If you would like to see full, working example – check my GitHub repository.

Additional Resources

Refactoring Towards Resilience: Evaluating Coupling – Jimmy Bogard
Asynchronous message-based communication – Microsoft

Related posts

Domain Model Encapsulation and PI with Entity Framework 2.2
Simple CQRS implementation with raw SQL and DDD
How to publish and handle Domain Events

Domain Model Encapsulation and PI with Entity Framework 2.2

Introduction

In previous post I presented how to implement simple CQRS pattern using raw SQL (Read Model) and Domain Driven Design (Write Model). I would like to continue presented example focusing mainly on DDD implementation. In this post I will describe how to get most out of the newest version Entity Framework v 2.2 to support pure domain modeling as much as possible.

I decided that I will constantly develop my sample on GitHub. I will try to gradually add new functionalities and technical solutions. I will also try to extend domain so that the application will become similar to the real ones. It is difficult to explain some DDD aspects on trivial domains. Nevertheless, I highly encourage you to follow my codebase.

Goals

When we create our Domain Model we have to take many things into account. At this point I would like to focus on 2 of them: Encapsulation and Persistence Ignorance.

Encapsulation

Encapsulation has two major definitions (source – Wikipedia):

A language mechanism for restricting direct access to some of the object’s components

and

A language construct that facilitates the bundling of data with the methods (or other functions) operating on that data

What does it mean to DDD Aggregates? It just simply mean that we should hide all internals of our Aggregate from the outside world. Ideally, we should expose only public methods which are required to fulfill our business requirements. This assumption is presented below:

Persistence Ignorance

Persistence Ignorance (PI) principle says that the Domain Model should be ignorant of how its data is saved or retrieved. It is very good and important advice to follow. However, we should follow it with caution. I agree with opinion presented in the Microsoft documentation:

Even when it is important to follow the Persistence Ignorance principle for your Domain model, you should not ignore persistence concerns. It is still very important to understand the physical data model and how it maps to your entity object model. Otherwise you can create impossible designs.

As described, we can’t forget about persistence, unfortunately. Nevertheless, we should aim at decoupling Domain Model from rest parts of our system as much as possible.

Example Domain

For a better understanding of the created Domain Model I prepared the following diagram:

It is simple e-commerce domain. Customer can place one or more Orders. Order is a set of Products with information of quantity ( OrderProduct). Each Product has defined many prices ( ProductPrice) depending on the Currency.

Ok, we know the problem, now we can go to the solution…

Solution

1. Create supporting architecture

First and most important thing to do is create application architecture which supports both Encapsulation and Persistence Ignorance of our Domain Model. The most common examples are:
Clean Architecture
Onion Architecture
Ports And Adapters / Hexagonal Architecture

All of these architectures are good and and used in production systems. For me Clean Architecture and Onion Architecture are almost the same. Ports And Adapters / Hexagonal Architecture is a little bit different when it comes to naming, but general principles are the same. The most important thing in context of domain modeling is that each architecture Business Logic/Business Layer/Entities/Domain Layer 1) is in the center and 2) has no dependency to other components/layers/modules. It is the same in my example:

What this means in practice for our code in Domain Model?
1. No data access code.
2. No data annotations for our entities.
3. No inheritance from any framework classes, entities should be Plain Old CLR Object

2. Use Entity Framework in Infrastructure Layer only

Any interaction with database should be implemented in Infrastructure Layer. It means you have to add there entity framework context, entity mappings and implementation of repositories. Only interfaces of repositories can be kept in Domain Model.

3. Use Shadow Properties

Shadow Properties are great way to decouple our entities from database schema. They are properties which are defined only in Entity Framework Model. Using them we often don’t need to include foreign keys in our Domain Model and it is great thing.

Let’s see the Order Entity and its mapping which is defined in CustomerEntityTypeConfiguration mapping:

As you can see on line 15 we are defining property which doesn’t exist in Order entity. It is defined only for relationship configuration between Customer and Order. The same is for Order and ProductOrder relationship (see lines 23, 24).

4. Use Owned Entity Types

Using Owned Entity Types we can create better encapsulation because we can map directly to private or internal fields:

Owned types are great solution for creating our Value Objects too. This is how MoneyValue looks like:

5. Map to private fields

We can map to private fields not only using EF owned types, we can map to built-in types too. All we have to do is give the name of the field and column:

6. Use Value Conversions

Value Conversions are the “bridge” between entity attributes and table column values. If we have incompatibility between types, we should use them. Entity Framework has a lot of value converters implemented out of the box. Additionally, we can implement custom converter if we need to.

This converter simply converts “StatusId” column byte type to private field _status of type OrderStatus.

Summary

In this post I described shortly what Encapsulation and Persistence Ignorance is (in context of domain modeling) and how we can achieve these approaches by:
– creating supporting architecture
– putting all data access code outside our domain model implementation
– using Entity Framework Core features: Shadow Properties, Owned Entity Types, private fields mapping, Value Conversions

Related posts

Simple CQRS implementation with raw SQL and DDD
How to publish and handle Domain Events
REST API Data Validation