Handling Domain Events: Missing Part

Introduction

Some time ago I wrote post about publishing and handling domain events. In addition, in one of the posts I described the Outbox Pattern, which provides us At-Least-Once delivery when integrating with external components / services without using the 2PC protocol.

This time I wanted to present a combination of both approaches to complete previous posts. I will present a complete solution that enables reliable data processing in the system in a structured manner taking into account the transaction boundary.

Depth of the system

At the beginning I would like to describe what is a Shallow System and what is a Deep System.

Shallow System

The system is considered to be shallow when, most often, after doing some action on it, there is not much going on.

A typical and most popular example of this type of system is that it has many CRUD operations. Most of the operations involve managing the data shown on the screen and there is little business logic underneath. Sometimes such systems can also be called a database browser. πŸ˜‰

Another heuristic that can point to a Shallow System is the ability to specify the requirements for such a system practically through the GUI prototype. The prototype of the system (possibly with the addition of comments) shows us how this system should work and it is enough – if nothing underneath is happening then there is nothing to define and describe.

From the Domain-Driven Design point of view, it will most often look like this: the execution of the Command on the Aggregate publishes exactly one Domain Event and… nothing happens. No subscribers to this event, no processors / handlers, no workflows defined. There is no communication with other contexts or 3rd systems either. It looks like this:

Shalow system in context of DDD
Shallow system in context of DDD.

Execute action, process request, save data – end of story. Often, in such cases, we do not even need DDD. Transaction Script or Active Record will be enough.

Deep system

The Deep System is (as one could easily guess) the complete opposite of the Shallow System.

A Deep System is one that is designed to resolve some problems in a non-trivial and complicated domain. If the domain is complicated then the Domain Model will be complicated as well. Of course, the Domain Model should be simplified as it is possible and at the same time it should not lose aspects that are most important in a given context (in terms of DDD – Bounded Context). Nevertheless, it contains a lot of business logic that needs to be handled.

We do not specify a Deep System by the GUI prototype because too much is happening underneath. Saving or reading data is just one of the actions that our system does. Other activities are communication with other systems, complicated data processing or calling other parts of our system.

This time, much more is happening in the context of Domain-Driven Design implementation. Aggregates can publish multiple Domain Events , and for each Domain Event there can be many handlers responsible for different behavior. This behavior can be communication with an external system or executing a Command on another Aggregate, which will again publish its events to which another part of our system will subscribe. This scheme repeats itself and our Domain Model reacts in a reactive manner:

Deep system
Deep system in context of DDD.

Problem

In post about publishing and handling domain events was presented very simple case and the whole solution did not support the re-publishing (and handling) of events by another Aggregate, which processing resulted from the previous Domain Event. In other words, there was no support for complex flows and data processing in a reactive way. Only one Command -> Aggregate -> Domain Event -> handlers scenario was possible.

It will be best to consider this in a specific example. Let’s assume the requirements that after placing an Order by the Customer:
a) Confirmation email to the Customer about placed Order should be sent
b) New Payment should be created
c) Email about new Payment to the Customer should be sent

These requirements are illustrated in the following picture:

Let’s assume that in this particular case both Order placement and Payment creation should take place in the same transaction. If transaction is successful, we need to send 2 emails – about the Order and Payment. Let’s see how we can implement this type of scenario.

Solution

The most important thing we have to keep in mind is the boundary of transaction. To make our life easier, we must make the following assumptions:

1. Command Handler defines transaction boundary. Transaction is started when Command Handler is invoked and committed at the end.
2. Each Domain Event handler is invoked in context of the same transaction boundary.
3. If we want to process something outside the transaction, we need to create a public event based on the Domain Event. I call it Domain Event Notification, some people call it a public event, but the concept is the same.

The second most important thing is when to publish and process Domain Events? Events may be created after each action on the Aggregate, so we must publish them:
– after each Command handling (but BEFORE committing transaction)
– after each Domain Event handling (but WITHOUT committing transaction)

Last thing to consider is processing of Domain Event Notifications (public events). We need to find a way to process them outside transaction and here Outbox Pattern comes in to play.

The first thing that comes to mind is to publish events at the end of each Command handler and commit the transaction, and at the end of each Domain Event handler only publish events. We can, however, try a much more elegant solution here and use the Decorator Pattern. Decorator Pattern allows us to wrap up our handling logic in infrastructural code, similar like Aspect-oriented programming and .NET Core Middlewares work.

We need two decorators. The first one will be for command handlers:

As you can see, in line 16 the processing of a given Command takes place (real Command handler is invoked), in line 18 there is a Unit of Work commit. UoW commit publishes Domain Events and commits the existing transaction:

In accordance with the previously described assumptions, we also need a second decorator for the Domain Event handler, which will only publish Domain Events at the very end without committing database transaction:

Last thing to do is configuration our decorators in IoC container (Autofac example):

Add Domain Event Notifications to Outbox

The second thing we have to do is to save notifications about Domain Events that we want to process outside of the transaction. To do this, we use the implementation of the Outbox Pattern:

As a reminder – the data for our Outbox is saved in the same transaction, which is why At-Least-Once delivery is guaranteed.

Implementing flow steps

At this point, we can focus only on the application logic and does not need to worry about infrastructural concerns. Now, we only implementing the particular flow steps:

a) When the Order is placed then create Payment:

b) When the Order is placed then send an email:

c) When the Payment is created then send an email:

The following picture presents the whole flow:

Flow of processing
Flow of processing

Summary

In this post I described how it is possible to process Commands and Domain Events in a Deep System in a reactive way.

Summarizing, the following concepts has been used for this purpose:

– Decorator Pattern for events dispatching and transaction boundary management
– Outbox Pattern for processing events in separate transaction
– Unit of Work Pattern
– Domain Events Notifications (public events) saved to the Outbox
– Basic DDD Building Blocks – Aggregates and Domain Events
– Eventual Consistency

Source code

If you would like to see full, working example – check my GitHub repository.

Additional Resources

The Outbox: An EIP Pattern – John Heintz
Domain events: design and implementation – Microsoft

Related posts

How to publish and handle Domain Events
Simple CQRS implementation with raw SQL and DDD
The Outbox Pattern

The Outbox Pattern

Introduction

Sometimes, when processing a business operation, you need to communicate with an external component in the Fire-and-forget mode. That component can be, for example:
– external service
– message bus
– mail server
– same database but different database transaction
– another database

Examples of this type of integration with external components:
– sending an e-mail message after placing an order
– sending an event about new client registration to the messaging system
– processing another DDD Aggregate in different database transaction – for example after placing an order to decrease number of products in stock

The question that arises is whether we are able to guarantee the atomicity of our business operation from a technical point of view? Unfortunately, not always, or even if we can (using 2PC protocol), this is a limitation of our system from the point of latency, throughput, scalability and availability. For details about these limitations, I invite you to read the article titled It’s Time to Move on from Two Phase Commit.

The problem I am writing about is presented below:

After execution of line 24 transaction is committed. In line 28 we want to send an event to event bus, but unfortunately 2 bad things can happen:
– our system can crash just after transaction commit and before sending the event
– event bus can be unavailable at this moment so the event cannot be sent

Outbox pattern

If we cannot provide atomicity or we don’t want to do that for the reasons mentioned above, what could we do to increase the reliability of our system? We should implement the Outbox Pattern.

Outbox pattern

The Outbox Pattern is based on Guaranteed Delivery pattern and looks as follows:

Outbox pattern

When you save data as part of one transaction, you also save messages that you later want to process as part of the same transaction. The list of messages to be processed is called an Outbox, just like in e-mail clients.

The second element of the puzzle is a separate process that periodically checks the contents of the Outbox and processes the messages. After processing each message, the message should be marked as processed to avoid resending. However, it is possible that we will not be able to mark the message as processed due to communication error with Outbox:

Outbox messages processing

In this case when connection with Outbox is recovered, the same message will be sent again. What all this means to us? Outbox Pattern gives At-Least-Once delivery. We can be sure that message will be sent once, but can be sent multiple times too! That’s why another name for this approach is Once-Or-More delivery. We should remember this and try to design receivers of our messages as Idempotents, which means:

In Messaging this concepts translates into a message that has the same effect whether it is received once or multiple times. This means that a message can safely be resent without causing any problems even if the receiver receives duplicates of the same message.

Ok, Enough theory, let’s see how we can implement this pattern in .NET world.

Implementation

Outbox message

At the beginning, we need to define the structure of our OutboxMessage:

What is important, the OutboxMessage class is part of the Infrastructure and not the Domain Model! Try to talk with business about Outbox, they will think about the outlook application instead of the messaging pattern. πŸ™‚ I didn’t include ProcessedDate property because this class is only needed to save message as part of transaction so this property always will be NULL in this context.

Saving the message

For sure I do not want to program writing messages to the Outbox every time in each Command Handler, it is against DRY principle. For this reason, the Notification Object described in the post about publishing Domain Events can be used. Following solution is based on linked article with little modification – instead of processing the notifications immediately, it serializes them and writes them to the database.

As a reminder, all Domain Events resulting from an action are processed as part of the same transaction. If the Domain Event should be processed outside of the ongoing transaction, you should define a Notification Object for it. This is the object which should be written to the Outbox. The code looks like:

Example of Domain Event:

And Notification Object:

First thing to note is Json.NET library usage. Second thing to note are 2 constructors of CustomerRegisteredNotification class. First of them is for creating notification based on Domain Event. Second of them is to deserialize message from JSON string which is presented in following section regarding processing.

Processing the message

The processing of Outbox messages should take place in a separate process. However, instead of a separate process, we can also use the same process but another thread depending on the needs. Solution which is presented below can be used in both cases.

At the beginning, we need to use a scheduler that will periodically run Outbox processing. I do not want to create the scheduler myself (it is known and solved problem) so I will use one the mature solution in .NET – Quartz.NET. Configuration of Quartz scheduler is very simple:

Firstly, scheduler is created using factory. Then, new instance of IoC container for resolving dependencies is created. The last thing to do is to configure our job execution schedule. In case above it will be executed every 15 seconds but its configuration really depends on how many messages you will have in your system.

This is how ProcessOutboxJob looks like:

The most important parts are:
Line 1 – [DisallowConcurrentExecution] attribute means that scheduler will not start new instance of job if other instance of that job is running. This is important because we don’t want process Outbox concurrently.
Line 25 – Get all messages to process
Line 30 – Deserialize message to Notification Object
Line 32 – Processing the Notification Object (for example sending event to bus)
Line 38 – Set message as processed

As I wrote earlier, if there is an error between processing the message (line 32) and setting it as processed (line 38), job in the next iteration will want to process it again.

Notification handler template looks like this:

Finally, this is view of our Outbox:

Outbox view

Summary

In this post I described what are the problems with ensuring the atomicity of the transaction during business operation processing. I’ve raised the topic of 2PC protocol and motivation to not use it. I presented what the Outbox Pattern is and how we can implement it. Thanks to this, our system can be really more reliable.

Source code

If you would like to see full, working example – check my GitHub repository.

Additional Resources

Refactoring Towards Resilience: Evaluating Coupling – Jimmy Bogard
The Outbox: An EIP Pattern – John Heintz
Asynchronous message-based communication – Microsoft

Related posts

Domain Model Encapsulation and PI with Entity Framework 2.2
Simple CQRS implementation with raw SQL and DDD
How to publish and handle Domain Events

How to publish and handle Domain Events

2019-06-19 UPDATE: Please check Handling Domain Events: Missing Part post which is a continuation of this article

Introduction

Domain Event is one of the building blocks of Domain Driven Design. It is something that happened in particular domain and it captures memory of it. We create Domain Events to notify other parts of the same domain that something interesting happened and these other parts potentially can react to.

Domain Event is usually immutable data-container class named in the past tense. For example:

Three ways of publishing domain events

I have seen mainly three ways of publishing domain events.

1. Using static DomainEvents class

This approach was presented by Udi Dahan in his Domain Events Salvation post. In short, there is a static class named DomainEvents with method Raise and it is invoked immediately when something interesting during aggregate method processing occurred. Word immediately is worth emphasizing because all domain event handlers start processing immediately too (even aggregate method did not finish processing).

2. Raise event returned from aggregate method

This is approach when aggregate method returns Domain Event directly to ApplicationService. ApplicationService decides when and how to raise event. You can become familiar with this way of raising events reading Jan Kronquist Don’t publish Domain Events, return them! post.

3. Add event to Events Entity collection.

In this way on every entity, which creates domain events, exists Events collection. Every Domain Event instance is added to this collection during aggregate method execution. After execution, ApplicationService (or other component) reads all Eventscollections from all entities and publishes them. This approach is well described in Jimmy Bogard post A better domain events pattern.

Handling domain events

The way of handling of domain events depends indirectly on publishing method. If you use DomainEvents static class, you have to handle event immediately. In other two cases you control when events are published as well handlers execution – in or outside existing transaction.

In my opinion it is good approach to always handle domain events in existing transaction and treat aggregate method execution and handlers processing as atomic operation. This is good because if you have a lot of events and handlers you do not have to think about initializing connections, transactions and what should be treat in “all-or-nothing” way and what not.

Sometimes, however, it is necessary to communicate with 3rd party service (for example e-mail or web service) based on Domain Event. As we know, communication with 3rd party services is not usually transactional so we need some additional generic mechanism to handle these types of scenarios. So I created Domain Events Notifications.

Domain Events Notifications

There is no such thing as domain events notifications in DDD terms. I gave that name because I think it fits best – it is notification that domain event was published.

Mechanism is pretty simple. If I want to inform my application that domain event was published I create notification class for it and as many handlers for this notification as I want. I always publish my notifications after transaction is committed. The complete process looks like this:

1. Create database transaction.
2. Get aggregate(s).
3. Invoke aggregate method.
4. Add domain events to Events collections.
5. Publish domain events and handle them.
6. Save changes to DB and commit transaction.
7. Publish domain events notifications and handle them.

How do I know that particular domain event was published?

First of all, I have to define notification for domain event using generics:

All notifications are registered in IoC container:

In EventsPublisher we resolve defined notifications using IoC container and after our unit of work is completed, all notifications are published:

This is how whole process looks like presented on UML sequence diagram:

You can think that there is a lot of things to remember and you are right!:) But as you can see whole process is pretty straightforward and we can simplify this solution using IoC interceptors which I will try to describe in another post.

Summary

1. Domain event is information about something which happened in the past in modeled domain and it is important part of DDD approach.
2. There are many ways of publishing and handling domain events – by static class, returning them, exposing by collections.
2. Domain events should be handled within existing transaction (my recommendation).
3. For non-trasactional operations Domain Events Notifications were introduced.

Related posts

Handling Domain Events: Missing Part
The Outbox Pattern

Processing multiple aggregates – transactional vs eventual consistency

When we use Domain Driven Design approach in our application, sometimes we have to invoke some method on multiple instances of aggregate of the same type.

For example, in our domain we have customers and when big Black Friday campaign starts we have to recalculate theirs discounts. So in domain model exists Customer aggregate with RecalculateDiscount method and in Application Layer we have DiscountAppService which is responsible for this use case.

There are 2 ways to implement this and similar scenarios:

1. Using transactional consistency

This is the simplest solution, we get all customers aggregates and on every instance the RecalculateDiscount method is invoked. We surrounded our processing with TransactionScope so after that we can be certain that every customer have recalculated discount or none of them. This is transactional consistency – it provides us ACID and sometimes is enough solution, but in many cases (especially while processing multiple aggregates in DDD terms) this solution is very bad approach.

First of all, customers are loaded to memory and we can have performance issue. Of course we can change implementation a little, get only customers identifiers and in foreach loop load customers one by one. But we have worse problem – our transaction holds locks on our aggregates until end of processing and other processes have to wait. For the record – default transaction scope isolation level is Serializable. We can change isolation level but we can’t get rid of locks. In this case application becomes less responsive, we can have timeouts and deadlocks – things we should avoid how we can.

2. Using eventual consistency

In this approach we do not use big transaction. Instead of this, we process every customer aggregate separately. Eventual consistency means that in specified time our system wile be in inconsistent state, but after given time will be consistent. In our example there is a time, that some of customers have discounts recalculated and some of them not. Let’s see the code:

In this case on the beginning we got only customers identifiers and we process customer aggregates one by one asynchronously (and parallel if applicable). We removed problem of locking our aggregates for a long time. The simplest solution is usage of Task.Run() , but using this approach we totally losing control of processing. Better solution is to use some 3rd party library like Hangfire, Quartz.NET or messaging system.

Eventual consistency is a big topic used in distributed computing, encountered together with CQRS. In this article I would like to show only another way of executing batch processing using this approach and its benefits. Sometimes this approach is not a good choice – it can have impact on GUI and users may see stale data for some time. That is why it is important to talk with domain experts because often it is fine for user to wait for update of data but sometimes it is unacceptable.

Summary

Transactional consistency – whole processing is executed in one transaction. It is “all or nothing” approach and sometimes can lead to decrease performance, scalability and availability of our application.

Eventual consistency – processing is divided and not executed in one big transaction. In some time application will be in inconsistent state. It leads to better scalability and availability of application. On the other hand can cause problems with GUI (stale data) and it requires supporting mechanisms which enable parallel processing, retries and sometimes process monitors as well.