Event Sourcing Examined Part 3 Of 3

Standard

In this 3 part series we will look at what event sourcing is and why enterprise software for many established industries use this pattern.

Index

1. Part One

  • Introduction to Event Sourcing
  • Why Use Event Sourcing?
  • Some Common Pitfalls

2. Part Two 

  • Getting Familiar With Aggregates
  • Event Sourcing Workflow
  • Commands
  • Domain Event
  • Internal Event Handler
  • Repository
  • Storage & Snapshots
  • Event Publisher

3. Part Three (This one)

  • CQRS
  • Read Side of CQRS
  • Using NEventLite

Event Sourcing as demonstrated in the previous blog posts is a GREAT way to implement the command side of CQRS. In this post, we will be looking at how to implement the read of CQRS and using my event sourcing “framework” NEventLite.

CQRS

Command Query Responsibility Segregation is an idea where we can model our solution to have separate write and read models. The idea was championed by Greg Young in the mid 2000s and since been adopted by many. It has its roots in CQS (Command Query Separation).

CQRS is hard to implement properly but has a lot of advantages like scalability for performance critical applications. As most information systems have magnitudes more reads than writes it makes more sense to have separate models to cater for specific concerns of read and write.

Have a look at Martin Fowler’s explanation here.

Read Side of CQRS

In the event sourcing methodology all we persist are domain events. If we don’t have a separate read model/view we would have to read all the past events to do an aggregation or to find a count. This is bad. Remember how CQRS dictates different models for write and read? Good. You can probably see where I’m going with this now. What we need is an optimised read-only model/view.

In the example of the Purchase Order we might want to show the customer previous purchase orders and the totals. Also, a warehouse manager might want to see the products and orders pending. Part of DDD is to identify these needs. One approach I’ve used is to create a separate read model for each page/grid/widget. This allows us to reuse views more often.

These read side event handlers then subscribe to domain events and updates read models as necessary. The update logic is self-contained and isolated from other views. This means we can have as many views as we like listening to domain event broadcasts. This gives us the ability to scale different views as required. (When I use the word view here I am talking about the read model + the event handler)

Let’s look at the sequence of events.

At start up,

  • PurchaseOrderSummaryView subscribes to PurchaseOrderCreated, Updated events
  • ProductOrderSummaryView subscribes to PurchaseOrderItemAdded, Updated events

When customer places an order,

  1. Domain event created
    • Persisted to the event store (Artefact of the command side)
    • Broadcasted to the message queue
  2. External Event handler receives an event (message) it subscribed to (I use the term internal event handler to describe the event handler inside the AggregateRoot)
  3. External Event handler reads the message and updates the read model to reflect the changes.
    • Optionally: Persist the last event version to the view as we can later use this to determine the “freshness” of the view.

Using NEventLite

NEventLite is a lightweight framework I created to do the mundane tasks associated with setting up an event sourced system in .net. Think of it as event sourcing on rails.

We are going to look at the examples provided in my github repo. We have very small project that demonstrates use of ES with snapshots and a read model.

Look at the sample Note domain model.

    public class Note : AggregateRoot, ISnapshottable
    {
        public DateTime CreatedDate { get; private set; }

        public string Title { get; private set; }

        public string Description { get; private set; }

        public string Category { get; private set; }

        #region "Constructor and Methods"

        public Note()
        {
            //Important: Aggregate roots must have a parameterless constructor
            //to make it easier to construct from scratch.

            //The very first event in an aggregate is the creation event
            //which will be applied to an empty object created via this constructor
        }

        //The following method are how external command interact with our aggregate
        //A command will result in following methods being executed and resulting events will be fired

        public Note(Guid id, string title, string desc, string cat):this()
        {
            //Pattern: Create the event and call ApplyEvent(Event)
            ApplyEvent(new NoteCreatedEvent(id, CurrentVersion, title, desc, cat, DateTime.Now));
        }

        public void ChangeTitle(string newTitle)
        {
            //Pattern: Create the event and call ApplyEvent(Event)
            if (this.Title != newTitle)
            {
                ApplyEvent(new NoteTitleChangedEvent(Id, CurrentVersion, newTitle));
            }
        }

//... see github repo example sample for full source code

As you can see we are following the pattern of creating an event and applying it. The internal event handlers are marked with a custom attribute and some “reflection magic” by the framework knows to call the right one when the ApplyEvent method is called.

        [InternalEventHandler]
        public void OnNoteCreated(NoteCreatedEvent @event)
        {
            CreatedDate = @event.CreatedTime;
            Title = @event.Title;
            Description = @event.Desc;
            Category = @event.Cat;
        }

        [InternalEventHandler]
        public void OnTitleChanged(NoteTitleChangedEvent @event)
        {
            Title = @event.Title;
        }

The Note class also implements ISnapshottable which adds these two methods allowing it to re-hydrate itself faster. (See part 2 of the blog series) .

        public NEventLite.Snapshot.Snapshot TakeSnapshot()
        {
            //This method returns a snapshot which will be used to reconstruct the state

            return new NoteSnapshot(Guid.NewGuid(),
                                    Id,
                                    CurrentVersion,
                                    CreatedDate,
                                    Title,
                                    Description,
                                    Category);
        }

        public void ApplySnapshot(NEventLite.Snapshot.Snapshot snapshot)
        {
            //Important: State changes are done here.
            //Make sure you set the CurrentVersion and LastCommittedVersions here too

            NoteSnapshot item = (NoteSnapshot)snapshot;

            Id = item.AggregateId;
            CurrentVersion = item.Version;
            LastCommittedVersion = item.Version;
            CreatedDate = item.CreatedDate;
            Title = item.Title;
            Description = item.Description;
            Category = item.Category;
        }

Once the framework applies the event, it publishes the event using the IEventPublisher. In the example I’m using an in-memory subscriber and injecting it in. (This isn’t how you would use it in production)

    public class MyEventPublisher : IEventPublisher
    {
        private readonly MyEventSubscriber _subscriber;

        public MyEventPublisher(MyEventSubscriber subscriber)
        {
            LogManager.Log("EventPublisher Started...", LogSeverity.Information);
            _subscriber = subscriber;
        }

        public async Task PublishAsync(IEvent @event)
        {
            LogManager.Log(
                $"Event #{@event.TargetVersion + 1} Published: {@event.GetType().Name} @ {DateTime.Now.ToLongTimeString()}",
                LogSeverity.Information);

            await Task.Run(() =>;
            {
                InvokeSubscriber(@event);

            }).ConfigureAwait(false);
        }

        private void InvokeSubscriber<T>(T @event) where T : IEvent
        {
            var o = _subscriber.GetType().GetMethodsBySig(typeof(Task), null, true, @event.GetType()).First();
            o.Invoke(_subscriber, new object[] { @event });
        }
    }

Then the in-memory subscriber listens to the events and handles them. The class implements IEventHandler for multiple events. You can have multiple subscribers handling the same events pretty easily if you want to. Just do the required code changes to allow the publisher to find them via reflection.

In a production scenario you will publish it to a message broker and the subscribers will get it from there. Message broker approach will give you capabilities like delivery guarantees and only-once delivery when required.

public class MyEventSubscriber : IEventHandler<NoteCreatedEvent>,
                                    IEventHandler<NoteTitleChangedEvent>,
                                    IEventHandler<NoteDescriptionChangedEvent>,
                                    IEventHandler<NoteCategoryChangedEvent>
    {

        private readonly MyReadRepository _repository;

        public MyEventSubscriber(MyReadRepository repository)
        {
            _repository = repository;
        }

public async Task HandleEventAsync(NoteCreatedEvent @event)
        {
            LogEvent(@event);

            _repository.AddNote(new NoteReadModel(@event.AggregateId, @event.CreatedTime, @event.Title, @event.Desc, @event.Cat));
        }

        public async Task HandleEventAsync(NoteTitleChangedEvent @event)
        {
            LogEvent(@event);

            var note = _repository.GetNote(@event.AggregateId);
            note.CurrentVersion = @event.TargetVersion + 1;
            note.Title = @event.Title;

            _repository.SaveNote(note);
        }

//See github repo for full source code

There is a very simple read model in the example.


    public class NoteReadModel
    {

        public Guid Id { get; private set; }
        public int CurrentVersion { get; set; }
        public DateTime CreatedDate { get; set; }
        public string Title { get; set; }
        public string Description { get; set; }
        public string Category { get; set; }

        public NoteReadModel(Guid id, DateTime createdDate, string title, string description, string category)
        {
            Id = id;
            CreatedDate = createdDate;
            Title = title;
            Description = description;
            Category = category;
        }
    }

Finally the read model has its own repository to allow consuming pages/widgets to access it. This is optional.

public class MyReadRepository
    {
        private readonly MyInMemoryReadModelStorage _stoage;

        public MyReadRepository(MyInMemoryReadModelStorage stoage)
        {
            _stoage = stoage;
        }

        public void AddNote(NoteReadModel note)
        {
            _stoage.AddOrUpdate(note);
        }

        public void SaveNote(NoteReadModel note)
        {
            _stoage.AddOrUpdate(note);
        }

The read models are persisted to a file in this example but you can use any form of persistence that best suits your needs. I recommend a NOSQL document store as most read models will end up being queried by their ID. You can use a relational model if you intend to have some more querying abilities. Do as you see fit.


    public class MyInMemoryReadModelStorage
    {
        private readonly string _memoryDumpFile;
        private readonly List<NoteReadModel> _allNotes = new List<NoteReadModel>();

        public MyInMemoryReadModelStorage(string memoryDumpFile)
        {
            _memoryDumpFile = memoryDumpFile;

            if (File.Exists(_memoryDumpFile))
            {
                _allNotes = SerializerHelper.LoadListFromFile<NoteReadModel>(_memoryDumpFile);
            }
        }

        public void AddOrUpdate(NoteReadModel note)
        {
            if (_allNotes.Any(o => o.Id == note.Id))
            {
                _allNotes.Remove(_allNotes.Single((o => o.Id == note.Id)));
                _allNotes.Add(note);
            }
            else
            {
                _allNotes.Add(note);
            }

            SerializerHelper.SaveListToFile<NoteReadModel>(_memoryDumpFile, _allNotes);
        }

        public NoteReadModel GetByID(Guid Id)
        {
            return _allNotes.FirstOrDefault(o => o.Id == Id);
        }

        public IEnumerable<NoteReadModel> GetAll()
        {
            return _allNotes.ToList();
        }
    }

Run the example (Full source code is at https://github.com/dasiths/NEventLite/tree/master/src/Examples/NEventLite%20Example)

You should see something like this confirming that real model and one constructed using replaying events have the same info.

Untitled

Wrapping Up

In this series we looked at Event Sourcing with Command Query Responsibility Segregation in depth and looked at an example of how to implement it using NEventLite. For further reading and learning I suggest following Greg Young’s blog and the CQRS google group.

Thanks for reading and please leave your comments and feedback below.

REST + AngularJS Basics With .Net Core WebAPI CRUD Example

Standard

Yes, I have been MIA for a while. I have been on holidays for an extended period of time and started working for a software consultancy a few weeks ago. So my busy schedule hasn’t given me an opportunity to blog.

Why Front End, Dasith?

I’m getting my hands dirty with with AngularJS for a brownfield project and wanted to write some material to help other team members who might not have been exposed to Angular before. So I took that opportunity to write it in a form of a blog post. I know front end tech is not my focus, but as a full stack developer it is important you know at least one major JavaScript mv* framework. It might even come handy when you need to whip up a quick prototype to demonstrate a back end feature to a customer. Bottom line is that we back end developers can’t ignore the front end technologies.

I’m using Angular 1.6 for this because it was a brownfield project and the existing code base was in 1.*. But if you are starting a new project I highly recommend you familiarize yourself with Typescript and Angular 2. (Strongly typed master race is a thing right?)


So let’s jump right in. It a prerequisite of this post to have an understand of the building blocks of an Angular app. You should have a basic understanding of a module, controller, services, data binding, filtering and directives. A good start is the Angular docs https://docs.angularjs.org/guide.

I have uploaded the code covered in the article into a github repo at https://github.com/dasiths/AngularJsWebApiTest and will be doing any feature additions and bug fixes there. Consider this post as a guide only.

Module.js

// Define the `booksApp` module
var app = angular.module('booksApp', []);

This is where we define the name of the module and have it assign to a global variable for later use. Nothing fancy here.

 Service.js

app.service('crudService', function ($http) {

 var baseUrl = 'http://localhost:50829';

 //Create new record
 this.post = function (Book) {
 var request = $http({
 method: "post",
 url: baseUrl + "/api/books",
 data: Book
 });
 return request;
 }

 //Get Single Records
 this.get = function (Id) {
 return $http.get(baseUrl + "/api/books/" + Id);
 }

 //Get All Books
 this.getAllBooks = function () {
 return $http.get(baseUrl + "/api/books");
 }

 //Update the Record
 this.put = function (Id, Book) {
 var request = $http({
 method: "put",
 url: baseUrl + "/api/books/" + Id,
 data: Book
 });
 return request;
 }

 //Delete the Record
 this.delete = function (Id) {
 var request = $http({
 method: "delete",
 url: baseUrl + "/api/books/" + Id
 });
 return request;
 }
});

We are defining the functions (they are actually called “promises”) here. Each of these functions handles a CRUD operation and points an end point. By putting these functions inside the service we are separating the concerns from the controller. We can later inject this service in to the controller. Have a read here https://www.sitepoint.com/tidy-angular-controllers-factories-services/.

Controller.js

app.controller('BookListController', function BookListController($scope, $http, crudService) {
 $scope.statusClass = 'label-info';
 $scope.status = 'Loading books...';
 $scope.IsNewRecord = 1; //The flag for the new record

 loadAllBooks();

 function loadAllBooks() {

 var promise = crudService.getAllBooks();

 promise.then(
 function (response) {
 $scope.Books = response.data;
 $scope.status = 'Loaded. Code: ' + response.status;
 $scope.statusClass = 'label-success';
 },
 function (error) {
 $scope.Books = null;
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 $log.error('failure loading Books', error);
 });
 }

 //Method to save and add
 $scope.save = function () {
 var Book = {
 Id: $scope.BookId,
 Name: $scope.BookName,
 Author: $scope.BookAuthor
 };

 //If the flag is 1 the it is a new record
 if ($scope.IsNewRecord === 1) {
 var promisePost = crudService.post(Book);
 promisePost.then(function (result) {
 $scope.BookId = result.data;
 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 console.log("Err" + error);
 });
 } else { //Else Edit the record
 var promisePut = crudService.put($scope.BookId, Book);
 promisePut.then(function (result) {
 $scope.status = "Updated Successfuly";
 $scope.statusClass = 'label-success';
 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 console.log("Err" + error);
 });
 }

 }

 //Method to Delete
 $scope.delete = function () {
 var promiseDelete = crudService.delete($scope.BookId);
 promiseDelete.then(function (result) {
 $scope.status = "Deleted Successfuly";
 $scope.statusClass = 'label-success';
 $scope.BookId = 0;
 $scope.BookName = "";
 $scope.BookAuthor = "";

 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 console.log("Err" + error);
 });
 }

 //Method to Get Single Book
 $scope.get = function (Book) {
 var promiseGetSingle = crudService.get(Book.id);

 promiseGetSingle.then(function (result) {
 var res = result.data;
 $scope.BookId = res.id;
 $scope.BookName = res.name;
 $scope.BookAuthor = res.author;

 $scope.IsNewRecord = 0;
 },
 function (errorPl) {
 console.log('failure loading Employee', errorPl);
 });
 }

 //Clear the Scope models
 $scope.clear = function () {
 $scope.statusClass = 'label-info';
 $scope.status = "";
 $scope.IsNewRecord = 1;
 $scope.BookId = 0;
 $scope.BookName = "";
 $scope.BookAuthor = "";
 }

}).directive('myBooktableheader', function () {
 return {
 templateUrl: 'http://localhost:51836/BookTemplate.html'
 };
});

Here we have the controller that coordinates everything. As we discussed earlier notice how the crudService is injected in. Dependency Injection is a major part of how Angular works. Read more here https://docs.angularjs.org/guide/di.

Everything here is pretty straight forward. We just created methods to load and save books. We have some fields in the scope that we use to bind the input fields to ($scope.BookId, BookName, BookAuthor). Read more here https://www.w3schools.com/angular/angular_databinding.asp.

To demonstrate directives, I have also added a custom directive definition right at the end of the controller which has an external template.

BookTemplate.html

<th>Id</th>
<th>Name</th>
<th>Author</th>
<th></th>

Directives are a big topic onto itself. We are covering the very basic use here for a trivial example. Read more about directives here https://docs.angularjs.org/guide/directive.

Book.cshtml (partial code example)

See full code at https://github.com/dasiths/AngularJsWebApiTest/blob/master/AngularWebApplication/Views/Book/Index.cshtml.

<body ng-controller="BookListController">
<div class="body-content">
<h3>@ViewData["Message"]</h3>
<table>
<tr>
<td>
<table id="searchObjResults" class="table-striped table-hover" width="600">
<tr my-Booktableheader></tr>
<tr ng-repeat="book in Books | filter:search:strict">
<td>{{book.id}}</td>
<td>{{book.name}}</td>
<td>{{book.author}}</td>
<td>
 <button type="button" class="btn" ng-click="get(book)">
 <span class="glyphicon glyphicon-edit"></span>
 </button></td>
</tr>
... see github for repo for full code

 <label ng-class="statusClass">Status: {{status}}</label>

Use this area to filter results.

 <label>Any: <input ng-model="search.$"></label>

 <label>Name only <input ng-model="search.name"></label>

 <label>Author only <input ng-model="search.author"></label>

 <label>Equality <input type="checkbox" ng-model="strict"></label>

This will be our template for the main web page. Take note of how we use data binding using one-way and two-way data binding directives. More here https://www.w3schools.com/angular/angular_databinding.asp.

The other thing to note here is the filtering. We have some typical examples of filter use covered in the example. Read more about filters here https://www.w3schools.com/angular/angular_filters.asp.

Well that’s it for the Client side of the application. Now let’s quickly look at the WebAPI end point.

I’m using Visual Studio 2017 to create my project. I created a project using the ASP.NET CORE web api template.

BookRepository.cs

 public class BookRepository
 {
 private List<Book> _books = new List<Book>();
 public BookRepository()
 {
 _books.Add(new Book() { Id = 1, Name = "P=NP in .NET", Author = "Jon Skeet" });
 _books.Add(new Book() { Id = 2, Name = "Dank Memes", Author = "Dasith Wijes" });
 _books.Add(new Book() { Id = 3, Name = "50 'Shards' of Azure SQL", Author = "Actor Model" });
 _books.Add(new Book() { Id = 4, Name = "Jailbreaking iOS to Set a Default Web Browser", Author = "Cult Of Fandroid" });
 _books.Add(new Book() { Id = 5, Name = "OCD Olympics", Author = "Also Me" });
 }

 public IEnumerable<Book> GetAllBooks()
 {
 return _books.ToArray();
 }

 public void UpdateBook(Book b)
 {
 _books[_books.IndexOf(_books.Single(o => o.Id == b.Id))] = b;
 }

 public int AddBook(Book b)
 {
 b.Id = _books.Max(o=> o.Id) + 1;
 _books.Add(b);

 return b.Id;
 }

 public void DeleteBook(int id)
 {
 _books.Remove(_books.Single(o => o.Id == id));
 }
 }

Startup.cs

I enabled CORS as follows in startup.cs because our Angular app will be communicating from a different “host”. (The angular app runs as a separate application on a separate port to the webapi.) Also I added the repository as a singleton to the IOC container.

public class Startup
{
public Startup(IHostingEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
.AddEnvironmentVariables();
Configuration = builder.Build();
}

public IConfigurationRoot Configuration { get; }

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
// Add framework services.
services.AddCors();
services.AddMvc();

//Add our repository as a singleton
services.AddSingleton<Repository.BookRepository>();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.AddDebug();

app.UseCors(builder => builder.WithOrigins("*")
.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader());

app.UseMvc();
}
}

 BookController.cs

This is your stock standard web api controller. I’ve put some effort into wrapping my results with the proper http status codes and handling some errors. This is by no means perfect but just note how the web api communicates back via both the status code and content.

[Route("api/[controller]")]
public class BooksController : Controller
{

BookRepository _repo;

public BooksController(BookRepository repo)
{
_repo = repo;
}

// GET api/books
[HttpGet]
public IActionResult Get()
{
var result = _repo.GetAllBooks();

if (result.Count() > 0)
return Ok(result);
else
return NoContent();
}

// GET api/books/id
[HttpGet("{id}")]
public IActionResult Get(int id)
{
var result = _repo.GetAllBooks().SingleOrDefault(o => o.Id == id);

if (result == null)
return NoContent();
else
return Ok(result);
}

// GET api/books/q=query
[HttpGet("q={name}")]
public IActionResult Get(string name)
{
var results = _repo.GetAllBooks().Where(
o => o.Name.IndexOf(name, 0, StringComparison.OrdinalIgnoreCase) >= 0);

if (results.Count() > 0)
{
return Ok(results);
}
else
{
return NoContent();
}

}

// POST api/books
[HttpPost]
public IActionResult Post([FromBody] Book b)
{
if (ModelState.IsValid == false)
return BadRequest();

try
{
return Ok(_repo.AddBook(b));
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}

}

// PUT api/books/id
[HttpPut("{id}")]
public IActionResult Put(int id, [FromBody] Book b)
{

if (ModelState.IsValid == false || id != b.Id)
return BadRequest();

var result = _repo.GetAllBooks().SingleOrDefault(o => o.Id == id);
if (result == null)
return NotFound();

try
{
_repo.UpdateBook(b);
return Ok(id);
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}

// DELETE api/books/id
[HttpDelete("{id}")]
public IActionResult Delete(int id)
{
try
{
_repo.DeleteBook(id);
return Ok(id);
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}

}

That’s it. We have covered the major components in the project. Run it and see if you can have a play with it.

AngularWebAPI

Good luck with your learning. I recommend you learn more about a topic like token authentication using Angular + webapi because it will be something you will end up doing often. Another important area we didn’t cover here is routing.

Please post your comments and ideas here as they help me improve my knowledge and blogging skills. Thank you.

Event Sourcing Examined Part 2 Of 3

Standard

In this 3 part series we will look at what event sourcing is and why enterprise software for many established industries use this pattern.

Index

1. Part One

  • Introduction to Event Sourcing
  • Why Use Event Sourcing?
  • Some Common Pitfalls

2. Part Two (This one)

  • Getting Familiar With Aggregates
  • Event Sourcing Workflow
  • Commands
  • Domain Event
  • Internal Event Handler
  • Repository
  • Storage & Snapshots
  • Event Publisher

3. Part Three

  • CQRS
  • Read Side of CQRS
  • Using NEventLite

In the last post we looked at what Event Sourcing (with CQRS) is and discussed some benefits and pitfalls. We also established that ES + CQRS is used for business components within a bounded context. Keep this in mind as we progress towards more technical aspects of ES and CQRS.

Getting Familiar With Aggregates

An aggregate is one of the building blocks of Domain Driven Design. Simply put an aggregate is a collection of domain objects that can be grouped into one unit. It is different from a collection (Array, List, Map) because aggregates are domain concepts.

A good example is a purchase order. The purchase order has the order document, collection of order line items and may also have value objects for totals and tax. All of these can be grouped into one unit. This unit is an aggregate.

The purchase order document in this example is the Aggregate Root. The aggregate root is the element which is visible to other contexts. The aggregate root also ensures the integrity of the entire aggregate. All changes or references to the aggregate must happen via the aggregate root and this ensures a natural transactional boundary.

Event Sourcing Workflow (with CQRS)

Event Sourcing with CQRS

Event Sourcing + CQRS workflow for NEventLite

  • Commands are instructions to do something and are in “imperative form
  • Some possible commands are CreateOrder, AddOrderItem, UpdateShippingAddress, AcceptOrder, ConvertToInvoice etc.
  • Event are in past tense. They indicate an action that happened in the system which results in a state change.  OrderCreated, OrderCourierMethodPicked, OrderShippingAddressUpdated etc.
  • The execution of a command can result in many events.

Now let’s walk through the steps. We will take the example of adding a new item to an existing purchase order.

(1) User selects an item “ItemABC” with a Quantity of “X” from inventory and add it to an existing order “OrderXYZ”. It will also have a version number “N” to indicate the version of the current Order. The UI sends this information to a web service. For Example WebServiceProxy.AddItemToOrder(Order, Item, Qty, OrderVersion)

The web service then creates a AddOrderItemCommand with the information received and puts that in a Command Bus (Message Queue). The web service then returns an ack saying the command was forwarded.

This doesn’t always mean the command will be processed or successful. The service can reject it or it can fail. Even when the command is successful, there is no guarantee the read model will be updated “immediately” (remember eventual consistency?). So the UI will have to periodically check the read model to see if it was updated (See if the Order version is now >= N+1) or have a method to listen to the resulting events. There are many ways of handling itHandling errors is also another big topic by itself.

(2) The command bus‘s purpose is to decouple the command creating service from the Command Consumers. The command bus will handle the delivery responsibility.

(3) The command handler registers its interest in the AddOrderItemCommand with the command bus (subscribes).

So the command bus knows to inform the handler when it receives that particular command. It’s important to note that there can only be one command handler per command. This ensures commands don’t get double handled.

(4) The command handler will then load an instance of the AggregateRoot using the unique ID. In this case our order # “OrderXYZ”.

The command handler will now check to see the if version of the aggregate matches the version of the command. This is to check for concurrency and to make sure our command gets executed against the right version of the aggregate. Once this is done, the command handler calls the appropriate method in the aggregate to do the operation and passes the information. For example. PurchaseOrderAggregate.AddItem(Item, Qty).  

Hint: A command must only affect one Aggregate as a command that changes multiple aggregates is an anti pattern

This is where is gets interesting. The AddItem() method doesn’t directly update the state.

(5) We treat state change as a first class concept. To facilitate this we use the concept of “Domain Events” which are immutable records of state change.

  • So to capture state change, the AddItem() method will create an event and Apply it to itself (ApplyChange(@event, true)). The event is the catalyst for state change. In our example the AddItem() method will create an event (OrderItemAdded) and apply it.
  • The Applying of the event will call the “Internal Event Handler” of the Order (AggregateRoot) which in turn will update the state.
  • It will also add the event which triggered the state change to a “Uncommitted Changes” collection.

This is how the source code might look.
NEventLite full source code for AggregateRoot is here. An example is here.

public void AddItem(Guid ItemID, int Qty) {
  //Crate the event and call the internal event handler.
  //Important: No state changes are done here.
  //We also put the current version of the aggregate in the event for consistency
  var @event = new OrderItemAddedEvent(this.OrderID, CurrentVersion, ItemID, Qty)
  ApplyChange(@event, true);
}

public void ApplyChange(IEvent @event, bool isNew) {
 //Call the right Apply method for this event
 this.AsDynamic().Apply(@event);

 //Only add new events to the uncommitted changes collection
 if (isNew) {
  UncommittedChanges.Add(@event);
 }
}

//This is the "InternalEventHandler"
public void Apply(OrderItemAddedEvent @event) {
  //Do state changes here like updating Total and Tax.
  //Also do the tasks required to add the order item.
}

The reason we use the internal event handler is to ensure the event gets applied using the same ApplyChange() method when we load (replay) events from storage. It ensures consistency when changing state. (We call ApplyChange() with isNew set to False when we load past events, So it won’t register as an uncommitted event.)

So remember, all state changes must happen inside an internal event handler.

(6) The repository handles the saving and loading of an AggregateRoot. Here is an example of a simple repository interface for an AggregateRoot.

public interface IRepository {
    T GetById<T>(Guid id) where T : AggregateRoot;
    void Save<T>(T aggregate) where T : AggregateRoot;
}

(7) The events get stored in an “Event Storage”. This can be a relational database or an dedicated event storage engine like EventStore.

(8) As you would have already noticed we have to load events for an AggregateRoot to reconstitute (load) it. To help make this faster we can save “snapshots” every N number of events.

Rather than loading all the events from history to construct an aggregate, we load the last snapshot, and ONLY apply events that are newer than the snapshot. We use our version property saved within the event to identify event sequence number.

The code for saving and loading aggregates will look something like this. (See how the Repository is implemented here for a comprehensive example).


//In our repository implementation

public virtual T GetById<T>(Guid id) where T : AggregateRoot
{
    T item;

    snapshot = SnapshotStorageProvider.GetSnapshot(typeof(T), id);
    if (snapshot != null)
    {
        item = new T();

        //Apply snapshot. this updates the aggregate state to the snapshot version
        item.ApplySnapshot(snapshot);

        //load events from snapshot version to max
        var events = EventStorageProvider.GetEvents(
        typeof(T), id, snapshot.Version + 1, int.MaxValue);

        //"replay" events
        item.LoadsFromHistory(events);
    }
    else
    {
        //load events from 0 to max. Basically all events.
        var events = (EventStorageProvider.GetEvents(
          typeof(T), id, 0, int.MaxValue)).ToList();

        item = new T();

        //"replay" events
        item.LoadsFromHistory(events);
    }

    return item;
}

public virtual void Save<T>(T aggregate) where T : AggregateRoot {
    if (aggregate.HasUncommittedChanges())
    {
        //Every N events we save a snapshot
        if (aggregate.CurrentVersion - aggregate.LastSnapshotVersion >=
        SnapshotStorageProvider.SnapshotFrequency)
        {
            SnapshotStorageProvider.SaveSnapshot(aggregate.TakeSnapshot());
        }

//Save events to event storage and publish to event bus. See step 9
....

(9) When we save those events in the Save(T aggregate) we also publish those events to an event bus / event publisher, which can be a message queue.

...
//After saving snapshots in step 8
//Commit Changes to EventStorage

         CommitChanges(aggregate);

         //Publish to event publisher asynchronously
         foreach (var e in changesToCommit)
         {
             EventPublisher.Publish(e);
         }
    }
}

Technically this is where the command side of CQRS ends but let’s look at couple more things that need to happen for the cycle to complete.

(10) There will be event handlers that listen to the OrderItemAddedEvent. Those event handlers express their interest in the types of events they want from the event bus (by subscribing).

The purpose of these event handlers are to listen to certain types of events and update the Read Model of our Order. Because we can have multiple event handlers for one type of event (unlike commands) we can have any number of read models for the Order. For example we can even have one dedicated for updating customer order total in a reporting database and another for product totals.

You can probably see the advantages of this now. Our write model (event stream) is totally separate from our read model. This allows us to scale different parts of our system as we wish.

(11) We typically use a in memory database like Redis or a relational database to store our read models. The idea is that the read side our application can quickly load a model by using a simple where clause without having to join anything.

A typical CQRS setup will have a read model for each view. An event handler for each view will listen to events (OrderItemAdded, OrderItemRemoved, OrderItemQtyUpdated) and update the view.


So to catch up.

  • We sent a command to add a new item to the order.
  • The AggregateRoot created an event(s) to capture the state change. (Even though in our example we only created one event, there can be many events that get generated for a single command)
  • The AggregateRoot applied the event(s) to itself to do the state change.
  • The repository saved the changes to event storage. (And snapshot as needed)
  • The saving of the event(s) also published them to the event bus.
  • Event handlers listening to the event received it and updated their “world views” (read models)

What’s Next

That was a very brief explanation of what each step does and how you could go about implementing it. I’ve used very simple code examples. For working examples I highly recommend you have a look at my Event Sourcing and CQRS framework NEventLite at GitHub.

Now that we have looked at the steps in the Command Side of CQRS the next step is to implement the read side. In the next post of the series we will look at how this can be done. I’ll also demonstrate how to quickly get an ES + CQRS application up and running using NEventLite.

I hope this helps you understand how CQRS works. If you have any questions or suggestions please post them here.

Wish you a happy new year and see you soon.

Event Sourcing Examined Part 1 of 3

Standard

In this 3 part series we will look at what event sourcing is and why enterprise software for many established industries use this pattern.

Index

1. Part One (This one)

  • Introduction to Event Sourcing
  • Why Use Event Sourcing?
  • Some Common Pitfalls

2. Part Two

  • Getting Familiar With Aggregates
  • Event Sourcing Workflow
  • Commands
  • Domain Event
  • Internal Event Handler
  • Repository
  • Storage & Snapshots
  • Event Publisher

3. Part Three

  • CQRS
  • Read Side of CQRS
  • Using NEventLite

Introduction

A name that’s almost synonymous with Event Sourcing is Greg Young. I suggest spending some time watching his many youtube videos to get a grasp of what event sourcing is. He is the main developer of EventStore, an event storage engine that has fast become my first choice for event sourcing.

Watch this YouTube video if you can spare a moment 

What is Event Sourcing?

I’m going to give you a generic definition first.

Event Sourcing Pattern. Use an append-only store to record the full series of events that describe actions taken on data in a domain, rather than storing just the current state.

But Why?

Good question. Consider this scenario. You look at your bank balance and don’t agree with what’s shown. You ring them up and complain the balance isn’t correct. If banks didn’t use event sourcing all they would have is the current state (balance) of your account. So it would be your word against theirs.

In its most simple form event sourcing is nothing but storing all the domain events so your state changes are represented as a series of actions. We treat state changes as a first class concept.

The reasons for using the event sourcing pattern are legion. Some of them are…

  • Ability to track all changes done to the domain

This is perhaps the core foundation on which an event sourcing system is built. Since we will store all domain events done to Aggregates (I’ll cover Aggregates in part 2) there is an audit trail of how the current state of an Aggregate came to be. This is why your company accountant uses what’s called a ledger.

  • Rebuild your state to a point in time

With the stored series of events we can quickly replay them to a point in time to construct a “World View” of how the system was at a specific time. Why is this useful?

Ever tried to report on a newly introduced metric but the customer wanted this information for the previous 12 months too? Ex. What percentage of customers removed an item from the shopping cart before they checked out? If you are using a relational database you will pull your hair out at this point. With event sourcing you would already have this information. You would just have to replay events for the past 12 months and report based on a “shopping cart item removed” domain event. This ability to go back and construct the state gives a huge advantage to the business because they can analyze the past better and you get an audit trail for free.

  • Less coupling between the representation of current state in the domain and storage (Ideal for CQRS)

When you store state, the natural inclination is to map your state to columns or fields in storage. This increases the coupling between your representation of state and how you decide to store them.

With event sourcing you are only concerned about storing events. For the read side we often use a thin data layer which is highly optimized for reads. We can even use a snapshot strategy to recreate the read model faster but that’s outside the context of this post.

This is why Event Sourcing plays so nicely with Command Query Responsibility Segregation (CQRS).

  • Ability to have many world views

We can have one or many read-models for use case specific interpretation of current state. This is powerful because this gives us the ability to present the same information in multiple ways depending on how the information consumed. Imagine if you have World View ABC and Word View XYZ subscribing to the same events (from the store / publisher) and updating their state/schema accordingly. ABC and XYZ might have different ideas of what each event means to them but the important concept here is that they both consume the SAME event.

  • Impedance mismatch is reduced between your domain and a relational database

In Object Oriented development we are faced with a challenge when persisting objects / aggregates to storage. We often go from a “graph” type relationship to a “tabular” form of storage. It’s even harder to convert a tabular form of data back to a graph form.

This is what’s called Impedance Mismatch. ORMs have been trying to minimize this for decades but ORM’s are a leaky abstraction. Don’t get me wrong. I love EF and NHibernate but you have to account for impedance mismatch when you choose to use an ORM. An ORM is not a magic bullet.

Event Sourcing reduces this by not storing state. An event is immutable and easily serializable to a tabular form. Since we are only storing events most concerns of Impedance Mismatch go away.

  • Easier to debug (Some times)

You can copy the events from the Production environment to a Dev environment and replay them to build the current state. This gives you a chance to add extra logging or allows you to step through certain parts of the code to dig deeper when something isn’t working as expected. This is basically like using the remote to pause/play/rewind to look at a sports highlight. You can go one step further by storing commands (when using CQRS) and replaying them.

So Why Not Use Event Sourcing All The Time?

f93c76303736d0ace43ec0892996cb214c5a81ccb436355d27952794eecca144

Only a sith deals in absolutes

Short answer. There is a time and place. Implementing Event sourcing is hard and time consuming. I wouldn’t recommend it unless the advantages described really add business value.

Be careful when you select the event sourcing pattern. Read up on the pitfalls and anti patterns like “Command Sourcing”.

A entire system built just on Event Sourcing is an anti pattern too. Architecturing is hard. There are pros and cons to every choice you make. You make your life easier when you make those decisions based on requirements rather than just “resume driven development”.

Something to keep in mind

ES + CQRS (Command Query Responsibility Segregation) is not a top level architecture pattern. CQRS will complement a top level design pattern like SOA. CQRS will work well for a bounded context in DDD. Some may even go further to say CQRS fits into a business component within a bounded context. If you aren’t sure whether your bounded context requires ES + CQRS the chances are that it doesn’t. CQRS is only needed for very specific use cases (Scalability, Complex Business Logic and Large Teams, etc). So choose wisely as there is a lot of complexity that comes with it. You have been warned.

Some Common Pitfalls

  • It’s still an unfamiliar concept to many

Event Sourcing although as a concept has been around for a long time in software development (Since ancient times in many other fields), it’s not something that’s been used often enough. Therefore you aren’t going to find a lot of people in your team familiar with the concept. Even though the pattern isn’t hard to understand it forces people to change their way of thinking. Initially you will find it hard to grasp especially when implementing. But once setup it’s fairly straightforward to add new functionality because it will be a process of repetition.

  • External systems or dependencies

If our system depends on external systems there is a risk that we can’t replicate information gathered from an external system at a specific point in time to replay events. There are ways to get around this by storing the information gathered from the external system in the event. But be very vary of this if dealing with external systems.

  • Async?

Just because we are building a CQRS system doesn’t mean the commands need to be asynchronous. There are times when this is not required. (Ex. Your source control system uses event sourcing but the COMMIT is not asynchronous). Just keep in mind that while Asynchronous processing will scale better it’s not a must to implement event sourcing.

  • Eventual Consistency (Not really a con)

Event Sourcing introduces eventual consistency to the system. While this is not a bad thing (Hint: It’s how the real world works) this makes things a hell of a lot harder to implement properly. Specially the UI.

  • Event schema changes

Overtime your event schema will change. Events are immutable so you need to handle versioning of your events. There are many strategies for this. Pick one that’s right for the context.

  • ID generation

If your internal event handlers generate ID’s then they can’t be random. The idea is you can replay your events many times and the resulting state must be the same. If the system generates a random identifier while replaying, a future event yet be replayed might fail because it now can’t correlate to the new ID.

So What’s Next?

In the next post I will look at some of the core concepts when trying to implement Event Sourcing. I will be using .NET and C# for code demonstrations and will introduce you to my Event Sourcing framework NEventLite.

DISCLAIMER: You generally want to avoid “frameworks” when implementing CQRS or Event Sourcing but NEventLite will help get you on the rails. You can swap out every aspect of it if need be. Think of it as training wheels.