What Does Durable Azure Functions Solve?

Standard

Azure Functions are the way serverless compute is implemented in Azure. Amazon Web Services equivalent is called AWS Lambda. I won’t go into detail about what serverless is and what advantages it provides in this post. You can find my learnings from a project where I implemented my web api using Azure Functions here.

I will be discussing the Azure Functions V2 (Still in preview at the time of writing this blog) and in particularly the newly introduced durable functions. For a quick comparison of the V1 and V2 runtimes have a look here.

What are “Durable Functions” in Azure?

Durable functions are an extension to Azure Functions which allows you to write stateful functions. The state management is abstracted away through an async/await and allows you to write orchestration logic in code. The runtime makes sure the state is durable even if the VM running the function in restarted or the function gets recycled.

Before going into detail, let’s see what problem durable functions promises to solve. After which you will have a good understanding where this fits in solution design.

Take this simple example.

function-chaining

We have 4 functions that chain the output from the previous one as the input for the next. This type of orchestration in common in function design.

Before durable functions, there were a couple of ways of solving this.

  1. Have a seperate orchestrator function and let it handle the workflow.
    • The orchestrator functions needs to be running the entire duration of the process. This ends up costing more because we have two functions running at the same time.
    • If the orchestrator function gets recycled or the VM restarted then the current state is lost
  2. Have queues in between the functions and have the functions be triggered by a queue message.
    • Requires a lot of queues and managing the connections/triggers between functions.

When faced with the function chaining problem, I’ve generally gone with the approach of using queues in the past. Whilst it was painful to do it that way, I got the benefit of the runtime handling situations where a function was recycled and the queue message had to be replayed.

Where does durable functions fit in this scenario?

Durable functions takes away the problems we had in solution 1. The orchestrator function sleeps while waiting for a child function output and wakes up automatically once an output is received. The runtime also manages the state so we don’t have to worry about function recycling or VM restarts.

How does the code look like?


public static async Task Run(DurableOrchestrationContext ctx)
{
    try
    {
/* 

inputs - Orchestration functions support only DurableOrchestrationContext as a
parameter type. Deserialization of inputs directly in the function signature is
not supported. Code must use the GetInput method to fetch orchestrator
function inputs. These inputs must be JSON-serializable types.

outputs - Orchestration triggers support output values as well as inputs. The
return value of the function is used to assign the output value and must be
JSON-serializable. If a function returns Task or void, a null value will be
saved as the output.

*/
        var someInput = context.GetInput();

        var outputF1 = await ctx.CallActivityAsync("Function1", someInput);
        var outputF2 = await ctx.CallActivityAsync("Function2", outputF1);
        var outputF3 = await ctx.CallActivityAsync("Function3", outputF2);
        var outputF4 = await ctx.CallActivityAsync("Function4", outputF3);

        // Log output
        return outputF4;
    }
    catch (Exception)
    {
        // error handling/compensation goes here
    }
}

How does it do it? Magic?

Not really. ūüôā It uses a cloud design pattern called Event Sourcing. I’ve got a series of blog posts about event sourcing if you’re interested.

Durable functions is built on top of the Durable Task Framework. When you await the¬†DurableOrchestrationContext, it writes to a ‘history table’ and exits the function. When an output is ready, it re-runs the function to the point of the await (checkpoint) and injects the value in. Underneath it uses queues to trigger the awakening but that’s all abstracted away from you. If you’re really interested you can have a peek inside the linked storage account for the auto/runtime generated tables and queues.

You can read more about the technology powering durable functions here.

Any Constraints?

Yes because the orchestrator functions runs multiple times (replay/checkpoint as mentioned above) it is important that the orchestrator function is deterministic. To put it simply, the code must return the same value for the same input.

  • This means you can’t generate random numbers or guids.
  • If you’re calling remote endpoints those will have to be idempotent.
  • No async/await unless you’re awaiting¬† on the DurableOrchestrationContext.
  • Non blocking (no I/O or Thread.Sleep).
  • Current date/time should be accessed through CurrentUTCDateTime.

Full list of constraints can be found here.

Closing Remarks

With durable functions, Microsoft has provided us with the ability to easily orchestrate our serverless functions. This will hopefully encourage more people to use Azure Functions to build their compute logic. I highly recommend you have a look at Microsoft Documentation and other use cases (i.e. Fan Out/ Fan in and Async Http) for durable functions.

If you have any thoughts or comments please leave them here as they help me improve.

Event Sourcing Examined Part 3 Of 3

Standard

In this 3 part series we will look at what event sourcing is and why enterprise software for many established industries use this pattern.

Index

1. Part One

  • Introduction to Event Sourcing
  • Why Use Event Sourcing?
  • Some Common Pitfalls

2. Part Two 

  • Getting Familiar With Aggregates
  • Event Sourcing Workflow
  • Commands
  • Domain Event
  • Internal Event Handler
  • Repository
  • Storage & Snapshots
  • Event Publisher

3. Part Three (This one)

  • CQRS
  • Read Side of CQRS
  • Using NEventLite

Event Sourcing as demonstrated in the previous blog posts is a GREAT way to implement the command side of CQRS. In this post, we will be looking at how to implement the read of CQRS and using my event sourcing ‚Äúframework‚ÄĚ NEventLite.

CQRS

Command Query Responsibility Segregation is an idea where we can model our solution to have separate write and read models. The idea was championed by Greg Young in the mid 2000s and since been adopted by many. It has its roots in CQS (Command Query Separation).

CQRS is hard to implement properly but has a lot of advantages like scalability for performance critical applications. As most information systems have magnitudes more reads than writes it makes more sense to have separate models to cater for specific concerns of read and write.

Have a look at Martin Fowler’s explanation here.

Read Side of CQRS

In the event sourcing methodology all we persist are domain events. If we don’t have a separate read model/view we would have to read all the past events to do an aggregation or to find a count. This is bad. Remember how CQRS dictates different models for write and read? Good. You can probably see where I’m going with this now. What we need is an optimised read-only model/view.

In the example of the Purchase Order we might want to show the customer previous purchase orders and the totals. Also, a warehouse manager might want to see the products and orders pending. Part of DDD is to identify these needs. One approach I’ve used is to create a separate read model for each page/grid/widget. This allows us to reuse views more often.

These read side event handlers then subscribe to domain events and updates read models as necessary. The update logic is self-contained and isolated from other views. This means we can have as many views as we like listening to domain event broadcasts. This gives us the ability to scale different views as required. (When I use the word view here I am talking about the read model + the event handler)

Let’s look at the sequence of events.

At start up,

  • PurchaseOrderSummaryView subscribes to PurchaseOrderCreated, Updated events
  • ProductOrderSummaryView subscribes to PurchaseOrderItemAdded, Updated events

When customer places an order,

  1. Domain event created
    • Persisted to the event store (Artefact of the command side)
    • Broadcasted to the message queue
  2. External Event handler receives an event (message) it subscribed to (I use the term internal event handler to describe the event handler inside the AggregateRoot)
  3. External Event handler reads the message and updates the read model to reflect the changes.
    • Optionally: Persist the last event version to the view as we can later use this to determine the ‚Äúfreshness‚ÄĚ of the view.

Using NEventLite

NEventLite is a lightweight framework I created to do the mundane tasks associated with setting up an event sourced system in .net. Think of it as event sourcing on rails.

We are going to look at the examples provided in my github repo. We have very small project that demonstrates use of ES with snapshots and a read model.

Look at the sample Note domain model.

    public class Note : AggregateRoot, ISnapshottable
    {
        public DateTime CreatedDate { get; private set; }

        public string Title { get; private set; }

        public string Description { get; private set; }

        public string Category { get; private set; }

        #region "Constructor and Methods"

        public Note()
        {
            //Important: Aggregate roots must have a parameterless constructor
            //to make it easier to construct from scratch.

            //The very first event in an aggregate is the creation event
            //which will be applied to an empty object created via this constructor
        }

        //The following method are how external command interact with our aggregate
        //A command will result in following methods being executed and resulting events will be fired

        public Note(Guid id, string title, string desc, string cat):this()
        {
            //Pattern: Create the event and call ApplyEvent(Event)
            ApplyEvent(new NoteCreatedEvent(id, CurrentVersion, title, desc, cat, DateTime.Now));
        }

        public void ChangeTitle(string newTitle)
        {
            //Pattern: Create the event and call ApplyEvent(Event)
            if (this.Title != newTitle)
            {
                ApplyEvent(new NoteTitleChangedEvent(Id, CurrentVersion, newTitle));
            }
        }

//... see github repo example sample for full source code

As you can see we are following the pattern of creating an event and applying it. The internal event handlers are marked with a custom attribute and some “reflection magic” by the framework knows to call the right one when the ApplyEvent method is called.

        [InternalEventHandler]
        public void OnNoteCreated(NoteCreatedEvent @event)
        {
            CreatedDate = @event.CreatedTime;
            Title = @event.Title;
            Description = @event.Desc;
            Category = @event.Cat;
        }

        [InternalEventHandler]
        public void OnTitleChanged(NoteTitleChangedEvent @event)
        {
            Title = @event.Title;
        }

The Note class also implements ISnapshottable which adds these two methods allowing it to re-hydrate itself faster. (See part 2 of the blog series) .

        public NEventLite.Snapshot.Snapshot TakeSnapshot()
        {
            //This method returns a snapshot which will be used to reconstruct the state

            return new NoteSnapshot(Guid.NewGuid(),
                                    Id,
                                    CurrentVersion,
                                    CreatedDate,
                                    Title,
                                    Description,
                                    Category);
        }

        public void ApplySnapshot(NEventLite.Snapshot.Snapshot snapshot)
        {
            //Important: State changes are done here.
            //Make sure you set the CurrentVersion and LastCommittedVersions here too

            NoteSnapshot item = (NoteSnapshot)snapshot;

            Id = item.AggregateId;
            CurrentVersion = item.Version;
            LastCommittedVersion = item.Version;
            CreatedDate = item.CreatedDate;
            Title = item.Title;
            Description = item.Description;
            Category = item.Category;
        }

Once the framework applies the event, it publishes the event using the IEventPublisher. In the example I’m using an in-memory subscriber and injecting it in. (This isn’t how you would use it in production)

    public class MyEventPublisher : IEventPublisher
    {
        private readonly MyEventSubscriber _subscriber;

        public MyEventPublisher(MyEventSubscriber subscriber)
        {
            LogManager.Log("EventPublisher Started...", LogSeverity.Information);
            _subscriber = subscriber;
        }

        public async Task PublishAsync(IEvent @event)
        {
            LogManager.Log(
                $"Event #{@event.TargetVersion + 1} Published: {@event.GetType().Name} @ {DateTime.Now.ToLongTimeString()}",
                LogSeverity.Information);

            await Task.Run(() =>;
            {
                InvokeSubscriber(@event);

            }).ConfigureAwait(false);
        }

        private void InvokeSubscriber<T>(T @event) where T : IEvent
        {
            var o = _subscriber.GetType().GetMethodsBySig(typeof(Task), null, true, @event.GetType()).First();
            o.Invoke(_subscriber, new object[] { @event });
        }
    }

Then the in-memory subscriber listens to the events and handles them. The class implements IEventHandler for multiple events. You can have multiple subscribers handling the same events pretty easily if you want to. Just do the required code changes to allow the publisher to find them via reflection.

In a production scenario you will publish it to a message broker and the subscribers will get it from there. Message broker approach will give you capabilities like delivery guarantees when required.

public class MyEventSubscriber : IEventHandler<NoteCreatedEvent>,
                                    IEventHandler<NoteTitleChangedEvent>,
                                    IEventHandler<NoteDescriptionChangedEvent>,
                                    IEventHandler<NoteCategoryChangedEvent>
    {

        private readonly MyReadRepository _repository;

        public MyEventSubscriber(MyReadRepository repository)
        {
            _repository = repository;
        }

public async Task HandleEventAsync(NoteCreatedEvent @event)
        {
            LogEvent(@event);

            _repository.AddNote(new NoteReadModel(@event.AggregateId, @event.CreatedTime, @event.Title, @event.Desc, @event.Cat));
        }

        public async Task HandleEventAsync(NoteTitleChangedEvent @event)
        {
            LogEvent(@event);

            var note = _repository.GetNote(@event.AggregateId);
            note.CurrentVersion = @event.TargetVersion + 1;
            note.Title = @event.Title;

            _repository.SaveNote(note);
        }

//See github repo for full source code

There is a very simple read model in the example.


    public class NoteReadModel
    {

        public Guid Id { get; private set; }
        public int CurrentVersion { get; set; }
        public DateTime CreatedDate { get; set; }
        public string Title { get; set; }
        public string Description { get; set; }
        public string Category { get; set; }

        public NoteReadModel(Guid id, DateTime createdDate, string title, string description, string category)
        {
            Id = id;
            CreatedDate = createdDate;
            Title = title;
            Description = description;
            Category = category;
        }
    }

Finally the read model has its own repository to allow consuming pages/widgets to access it. This is optional.

public class MyReadRepository
    {
        private readonly MyInMemoryReadModelStorage _storage;

        public MyReadRepository(MyInMemoryReadModelStorage storage)
        {
            _storage = storage;
        }

        public void AddNote(NoteReadModel note)
        {
            _storage.AddOrUpdate(note);
        }

        public void SaveNote(NoteReadModel note)
        {
            _storage.AddOrUpdate(note);
        }

The read models are persisted to a file in this example but you can use any form of persistence that best suits your needs. I recommend a NOSQL document store as most read models will end up being queried by their ID. You can use a relational model if you intend to have some more querying abilities. Do as you see fit.


    public class MyInMemoryReadModelStorage
    {
        private readonly string _memoryDumpFile;
        private readonly List<NoteReadModel> _allNotes = new List<NoteReadModel>();

        public MyInMemoryReadModelStorage(string memoryDumpFile)
        {
            _memoryDumpFile = memoryDumpFile;

            if (File.Exists(_memoryDumpFile))
            {
                _allNotes = SerializerHelper.LoadListFromFile<NoteReadModel>(_memoryDumpFile);
            }
        }

        public void AddOrUpdate(NoteReadModel note)
        {
            if (_allNotes.Any(o => o.Id == note.Id))
            {
                _allNotes.Remove(_allNotes.Single((o => o.Id == note.Id)));
                _allNotes.Add(note);
            }
            else
            {
                _allNotes.Add(note);
            }

            SerializerHelper.SaveListToFile<NoteReadModel>(_memoryDumpFile, _allNotes);
        }

        public NoteReadModel GetByID(Guid Id)
        {
            return _allNotes.FirstOrDefault(o => o.Id == Id);
        }

        public IEnumerable<NoteReadModel> GetAll()
        {
            return _allNotes.ToList();
        }
    }

Run the example (Full source code is at https://github.com/dasiths/NEventLite/tree/master/src/Examples/NEventLite%20Example)

You should see something like this confirming that real model and one constructed using replaying events have the same info.

Untitled

Wrapping Up

In this series we looked at Event Sourcing with Command Query Responsibility Segregation in depth and looked at an example of how to implement it using NEventLite. For further reading and learning I suggest following Greg Young’s blog and the CQRS google group.

Thanks for reading and please leave your comments and feedback below.

REST + AngularJS Basics With .Net Core WebAPI CRUD Example

Standard

Yes, I have been MIA for a while. I have been on holidays for an extended period of time and started working for a software consultancy a few weeks ago. So my busy schedule hasn’t given me an opportunity to blog.

Why Front End, Dasith?

I’m getting my hands dirty with with AngularJS for a brownfield project and wanted to write some material to help other team members who might not have been exposed to Angular before. So I took that opportunity to write it in a form of a blog post. I know front end tech is not my focus, but as a full stack developer it is important you know at least one major JavaScript mv* framework. It might even come handy when you need to whip up a quick prototype to demonstrate a back end feature to a customer. Bottom line is that we back end developers can’t ignore the front end technologies.

I’m using Angular 1.6 for this because it was a brownfield project and the existing code base was in 1.*. But if you are starting a new project I highly recommend you familiarize yourself with Typescript and Angular 2. (Strongly typed master race is a thing right?)


So let’s jump right in. It a prerequisite of this post to have an understand of the building blocks of an Angular app. You should have a basic understanding of a module, controller, services, data binding, filtering and directives. A good start is the Angular docs¬†https://docs.angularjs.org/guide.

I have uploaded the code covered in the article into a github repo at https://github.com/dasiths/AngularJsWebApiTest and will be doing any feature additions and bug fixes there. Consider this post as a guide only.

Module.js

// Define the `booksApp` module
var app = angular.module('booksApp', []);

This is where we define the name of the module and have it assign to a global variable for later use. Nothing fancy here.

 Service.js

app.service('crudService', function ($http) {

 var baseUrl = 'http://localhost:50829';

 //Create new record
 this.post = function (Book) {
 var request = $http({
 method: "post",
 url: baseUrl + "/api/books",
 data: Book
 });
 return request;
 }

 //Get Single Records
 this.get = function (Id) {
 return $http.get(baseUrl + "/api/books/" + Id);
 }

 //Get All Books
 this.getAllBooks = function () {
 return $http.get(baseUrl + "/api/books");
 }

 //Update the Record
 this.put = function (Id, Book) {
 var request = $http({
 method: "put",
 url: baseUrl + "/api/books/" + Id,
 data: Book
 });
 return request;
 }

 //Delete the Record
 this.delete = function (Id) {
 var request = $http({
 method: "delete",
 url: baseUrl + "/api/books/" + Id
 });
 return request;
 }
});

We are defining the functions (they are actually called “promises”) here. Each of these functions handles a CRUD operation and points an end point. By putting these functions inside the service we are separating the concerns from the controller. We can later inject this service in to the controller. Have a read here¬†https://www.sitepoint.com/tidy-angular-controllers-factories-services/.

Controller.js

app.controller('BookListController', function BookListController($scope, $http, crudService) {
 $scope.statusClass = 'label-info';
 $scope.status = 'Loading books...';
 $scope.IsNewRecord = 1; //The flag for the new record

 loadAllBooks();

 function loadAllBooks() {

 var promise = crudService.getAllBooks();

 promise.then(
 function (response) {
 $scope.Books = response.data;
 $scope.status = 'Loaded. Code: ' + response.status;
 $scope.statusClass = 'label-success';
 },
 function (error) {
 $scope.Books = null;
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 $log.error('failure loading Books', error);
 });
 }

 //Method to save and add
 $scope.save = function () {
 var Book = {
 Id: $scope.BookId,
 Name: $scope.BookName,
 Author: $scope.BookAuthor
 };

 //If the flag is 1 the it is a new record
 if ($scope.IsNewRecord === 1) {
 var promisePost = crudService.post(Book);
 promisePost.then(function (result) {
 $scope.BookId = result.data;
 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 console.log("Err" + error);
 });
 } else { //Else Edit the record
 var promisePut = crudService.put($scope.BookId, Book);
 promisePut.then(function (result) {
 $scope.status = "Updated Successfuly";
 $scope.statusClass = 'label-success';
 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 console.log("Err" + error);
 });
 }

 }

 //Method to Delete
 $scope.delete = function () {
 var promiseDelete = crudService.delete($scope.BookId);
 promiseDelete.then(function (result) {
 $scope.status = "Deleted Successfuly";
 $scope.statusClass = 'label-success';
 $scope.BookId = 0;
 $scope.BookName = "";
 $scope.BookAuthor = "";

 loadAllBooks();
 }, function (error) {
 $scope.status = 'Error: ' + error;
 $scope.statusClass = 'label-warning';
 console.log("Err" + error);
 });
 }

 //Method to Get Single Book
 $scope.get = function (Book) {
 var promiseGetSingle = crudService.get(Book.id);

 promiseGetSingle.then(function (result) {
 var res = result.data;
 $scope.BookId = res.id;
 $scope.BookName = res.name;
 $scope.BookAuthor = res.author;

 $scope.IsNewRecord = 0;
 },
 function (errorPl) {
 console.log('failure loading Employee', errorPl);
 });
 }

 //Clear the Scope models
 $scope.clear = function () {
 $scope.statusClass = 'label-info';
 $scope.status = "";
 $scope.IsNewRecord = 1;
 $scope.BookId = 0;
 $scope.BookName = "";
 $scope.BookAuthor = "";
 }

}).directive('myBooktableheader', function () {
 return {
 templateUrl: 'http://localhost:51836/BookTemplate.html'
 };
});

Here we have the controller that coordinates everything. As we discussed earlier notice how the crudService is injected in. Dependency Injection is a major part of how Angular works. Read more here https://docs.angularjs.org/guide/di.

Everything here is pretty straight forward. We just created methods to load and save books. We have some fields in the scope that we use to bind the input fields to ($scope.BookId, BookName, BookAuthor). Read more here https://www.w3schools.com/angular/angular_databinding.asp.

To demonstrate directives, I have also added a custom directive definition right at the end of the controller which has an external template.

BookTemplate.html

<th>Id</th>
<th>Name</th>
<th>Author</th>
<th></th>

Directives are a big topic onto itself. We are covering the very basic use here for a trivial example. Read more about directives here https://docs.angularjs.org/guide/directive.

Book.cshtml (partial code example)

See full code at https://github.com/dasiths/AngularJsWebApiTest/blob/master/AngularWebApplication/Views/Book/Index.cshtml.

<body ng-controller="BookListController">
<div class="body-content">
<h3>@ViewData["Message"]</h3>
<table>
<tr>
<td>
<table id="searchObjResults" class="table-striped table-hover" width="600">
<tr my-Booktableheader></tr>
<tr ng-repeat="book in Books | filter:search:strict">
<td>{{book.id}}</td>
<td>{{book.name}}</td>
<td>{{book.author}}</td>
<td>
 <button type="button" class="btn" ng-click="get(book)">
 <span class="glyphicon glyphicon-edit"></span>
 </button></td>
</tr>
... see github for repo for full code

 <label ng-class="statusClass">Status: {{status}}</label>

Use this area to filter results.

 <label>Any: <input ng-model="search.$"></label>

 <label>Name only <input ng-model="search.name"></label>

 <label>Author only <input ng-model="search.author"></label>

 <label>Equality <input type="checkbox" ng-model="strict"></label>

This will be our template for the main web page. Take note of how we use data binding using one-way and two-way data binding directives. More here https://www.w3schools.com/angular/angular_databinding.asp.

The other thing to note here is the filtering. We have some typical examples of filter use covered in the example. Read more about filters here https://www.w3schools.com/angular/angular_filters.asp.

Well that’s it for the Client side of the application. Now let’s quickly look at the WebAPI end point.

I’m using Visual Studio 2017 to create my project. I created a project using the ASP.NET CORE web api template.

BookRepository.cs

 public class BookRepository
 {
 private List<Book> _books = new List<Book>();
 public BookRepository()
 {
 _books.Add(new Book() { Id = 1, Name = "P=NP in .NET", Author = "Jon Skeet" });
 _books.Add(new Book() { Id = 2, Name = "Dank Memes", Author = "Dasith Wijes" });
 _books.Add(new Book() { Id = 3, Name = "50 'Shards' of Azure SQL", Author = "Actor Model" });
 _books.Add(new Book() { Id = 4, Name = "Jailbreaking iOS to Set a Default Web Browser", Author = "Cult Of Fandroid" });
 _books.Add(new Book() { Id = 5, Name = "OCD Olympics", Author = "Also Me" });
 }

 public IEnumerable<Book> GetAllBooks()
 {
 return _books.ToArray();
 }

 public void UpdateBook(Book b)
 {
 _books[_books.IndexOf(_books.Single(o => o.Id == b.Id))] = b;
 }

 public int AddBook(Book b)
 {
 b.Id = _books.Max(o=> o.Id) + 1;
 _books.Add(b);

 return b.Id;
 }

 public void DeleteBook(int id)
 {
 _books.Remove(_books.Single(o => o.Id == id));
 }
 }

Startup.cs

I enabled CORS as follows in startup.cs because our Angular app will be communicating from a different “host”. (The angular app runs as a separate application on a separate port to the webapi.) Also I added the repository as a singleton to the IOC container.

public class Startup
{
public Startup(IHostingEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
.AddEnvironmentVariables();
Configuration = builder.Build();
}

public IConfigurationRoot Configuration { get; }

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
// Add framework services.
services.AddCors();
services.AddMvc();

//Add our repository as a singleton
services.AddSingleton<Repository.BookRepository>();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.AddDebug();

app.UseCors(builder => builder.WithOrigins("*")
.AllowAnyOrigin()
.AllowAnyMethod()
.AllowAnyHeader());

app.UseMvc();
}
}

 BookController.cs

This is your stock standard web api controller. I’ve put some effort into wrapping my results with the proper http status codes and handling some errors. This is by no means perfect but just note how the web api communicates back via both the status code and content.

[Route("api/[controller]")]
public class BooksController : Controller
{

BookRepository _repo;

public BooksController(BookRepository repo)
{
_repo = repo;
}

// GET api/books
[HttpGet]
public IActionResult Get()
{
var result = _repo.GetAllBooks();

if (result.Count() > 0)
return Ok(result);
else
return NoContent();
}

// GET api/books/id
[HttpGet("{id}")]
public IActionResult Get(int id)
{
var result = _repo.GetAllBooks().SingleOrDefault(o => o.Id == id);

if (result == null)
return NoContent();
else
return Ok(result);
}

// GET api/books/q=query
[HttpGet("q={name}")]
public IActionResult Get(string name)
{
var results = _repo.GetAllBooks().Where(
o => o.Name.IndexOf(name, 0, StringComparison.OrdinalIgnoreCase) >= 0);

if (results.Count() > 0)
{
return Ok(results);
}
else
{
return NoContent();
}

}

// POST api/books
[HttpPost]
public IActionResult Post([FromBody] Book b)
{
if (ModelState.IsValid == false)
return BadRequest();

try
{
return Ok(_repo.AddBook(b));
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}

}

// PUT api/books/id
[HttpPut("{id}")]
public IActionResult Put(int id, [FromBody] Book b)
{

if (ModelState.IsValid == false || id != b.Id)
return BadRequest();

var result = _repo.GetAllBooks().SingleOrDefault(o => o.Id == id);
if (result == null)
return NotFound();

try
{
_repo.UpdateBook(b);
return Ok(id);
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}

// DELETE api/books/id
[HttpDelete("{id}")]
public IActionResult Delete(int id)
{
try
{
_repo.DeleteBook(id);
return Ok(id);
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}

}

That’s it. We have covered the major components in the project. Run it and see if you can have a play with it.

AngularWebAPI

Good luck with your learning. I recommend you learn more about a topic like token authentication using Angular + webapi because it will be something you will end up doing often. Another important area we didn’t cover here is routing.

Please post your comments and ideas here as they help me improve my knowledge and blogging skills. Thank you.

Event Sourcing Examined Part 2 Of 3

Standard

In this 3 part series we will look at what event sourcing is and why enterprise software for many established industries use this pattern.

Index

1. Part One

  • Introduction to Event Sourcing
  • Why Use Event Sourcing?
  • Some Common Pitfalls

2. Part Two (This one)

  • Getting Familiar With Aggregates
  • Event Sourcing Workflow
  • Commands
  • Domain Event
  • Internal Event Handler
  • Repository
  • Storage & Snapshots
  • Event Publisher

3. Part Three

  • CQRS
  • Read Side of CQRS
  • Using NEventLite

In the last post we looked at what Event Sourcing (ES) with Command Query Responsibility Segregation (CQRS) is and discussed some benefits and pitfalls. We also established that ES + CQRS is used for business components within a bounded context. Keep this in mind as we progress towards more technical aspects these patterns.

Getting Familiar With Aggregates

An aggregate is one of the building blocks of Domain Driven Design. Simply put an aggregate is a collection of domain objects that can be grouped into one unit. It is different from a collection (Array, List, Map) because aggregates are domain concepts.

A good example is a purchase order. The purchase order has the order document, collection of order line items and may also have value objects for totals and tax. All of these can be grouped into one unit. This unit is an aggregate.

The purchase order document in this example is the Aggregate Root. The aggregate root is the element which is visible to other contexts. The aggregate root also ensures the integrity of the entire aggregate. All changes or references to the aggregate must happen via the aggregate root and this ensures a natural transactional boundary.

Event Sourcing Workflow (with CQRS)

Event Sourcing with CQRS

Event Sourcing + CQRS workflow for NEventLite

  • Commands are instructions to do something and are in¬†“imperative form
  • Some possible commands are CreateOrder, AddOrderItem, UpdateShippingAddress, AcceptOrder, ConvertToInvoice etc.
  • Event are in past tense. They indicate an action that happened in the system which results in a state change. ¬†OrderCreated, OrderCourierMethodPicked, OrderShippingAddressUpdated etc.
  • The execution of a command can result in many events.

Now let’s walk through the steps. We will take the example of adding a new item to an existing purchase order.

(1) User selects an item “ItemABC” with a Quantity of “X” from inventory and add it to an existing order “OrderXYZ”. It will also have a version number “N” to indicate the version of the current Order. The UI sends this information to a web service. For Example WebServiceProxy.AddItemToOrder(Order, Item, Qty, OrderVersion)

The web service then creates a AddOrderItemCommand with the information received and puts that in a Command Bus (Message Queue). The web service then returns an ack saying the command was forwarded.

This doesn’t always mean the command will be processed or successful. The service can reject it or it can fail. Even when the command is successful,¬†there is no guarantee the read model will¬†be updated “immediately” (remember¬†eventual consistency?). So the UI will have to periodically check the read model to see if it was updated (See if the Order version is now >= N+1) or have a method to listen to the resulting events. There are many ways of handling it.¬†Handling errors is also another big topic by itself.

(2) The command bus‘s purpose is to decouple the command creating service from the Command Consumers. The command bus will handle the delivery responsibility.

(3) The command handler registers its interest in the AddOrderItemCommand with the command bus (subscribes).

So the command bus knows to inform the handler when it receives that particular command. It’s important to note that there can only be one command handler per command.¬†This ensures commands don’t get double handled.¬†Technically you can have many handlers but the command should only be “handled” once.¬†This is where idempotent command design comes handy. It’s out of scope for this post but have a read of this article¬†for more information if you’re interested.

(4)¬†The command handler will then load an instance of the AggregateRoot using the unique ID. In this case our order # “OrderXYZ”.

The command handler will now check to see the if version of the aggregate matches the version of the command. This is to check for concurrency and to make sure our command gets executed against the right version of the aggregate. Once this is done, the command handler calls the appropriate method in the aggregate to do the operation and passes the information. For example. PurchaseOrderAggregate.AddItem(Item, Qty).  

Hint: A command must only affect one Aggregate as a command that changes multiple aggregates is an anti pattern. 

This is where is gets interesting. The AddItem() method doesn’t directly update the state.

(5) We treat state change as a first class concept. To facilitate this we use the concept of “Domain Events” which are immutable records of state change.

  • So to capture state change, the AddItem() method will create an event and Apply it to itself¬†(ApplyChange(@event, true)). The event is the catalyst for state change. In our example the AddItem() method will create an event (OrderItemAdded) and apply it.
  • The Applying of the event will call the “Internal Event Handler” of the Order (AggregateRoot) which in turn will update the state.
  • It will also add the event which triggered the state change to a “Uncommitted Changes” collection.

This is how the source code might look.
NEventLite full source code for AggregateRoot is here. An example is here.

public void AddItem(Guid ItemID, int Qty) {
  //Crate the event and call the internal event handler.
  //Important: No state changes are done here.
  //We also put the current version of the aggregate in the event for consistency
  var @event = new OrderItemAddedEvent(this.OrderID, CurrentVersion, ItemID, Qty)
  ApplyChange(@event, true);
}

public void ApplyChange(IEvent @event, bool isNew) {
 //Call the right Apply method for this event
 this.AsDynamic().Apply(@event);

 //Only add new events to the uncommitted changes collection
 if (isNew) {
  UncommittedChanges.Add(@event);
 }
}

//This is the "InternalEventHandler"
public void Apply(OrderItemAddedEvent @event) {
  //Do state changes here like updating Total and Tax.
  //Also do the tasks required to add the order item.
}

The reason we use the internal event handler is to¬†ensure the event gets applied using the same ApplyChange() method¬†when we load (replay) events from storage. It ensures consistency when changing state.¬†(We call ApplyChange()¬†with isNew set to False when we load past events, So it won’t register as an uncommitted¬†event.)

So remember, all state changes must happen inside an internal event handler.

(6) The repository handles the saving and loading of an AggregateRoot. Here is an example of a simple repository interface for an AggregateRoot.

public interface IRepository {
    T GetById<T>(Guid id) where T : AggregateRoot;
    void Save<T>(T aggregate) where T : AggregateRoot;
}

(7)¬†The events get stored in an “Event Storage”. This can be a relational database or an dedicated event storage engine like EventStore.

(8) As you would have already noticed we have to load events for an AggregateRoot to reconstitute (load) it. To help make this faster we can save “snapshots” every N number of events.

Rather than loading all the events from history to construct an aggregate, we load the last snapshot, and ONLY apply events that are newer than the snapshot. We use our version property saved within the event to identify event sequence number.

The code for saving and loading aggregates will look something like this. (See how the Repository is implemented here for a comprehensive example).


//In our repository implementation

public virtual T GetById<T>(Guid id) where T : AggregateRoot
{
    T item;

    snapshot = SnapshotStorageProvider.GetSnapshot(typeof(T), id);
    if (snapshot != null)
    {
        item = new T();

        //Apply snapshot. this updates the aggregate state to the snapshot version
        item.ApplySnapshot(snapshot);

        //load events from snapshot version to max
        var events = EventStorageProvider.GetEvents(
        typeof(T), id, snapshot.Version + 1, int.MaxValue);

        //"replay" events
        item.LoadsFromHistory(events);
    }
    else
    {
        //load events from 0 to max. Basically all events.
        var events = (EventStorageProvider.GetEvents(
          typeof(T), id, 0, int.MaxValue)).ToList();

        item = new T();

        //"replay" events
        item.LoadsFromHistory(events);
    }

    return item;
}

public virtual void Save<T>(T aggregate) where T : AggregateRoot {
    if (aggregate.HasUncommittedChanges())
    {
        //Every N events we save a snapshot
        if (aggregate.CurrentVersion - aggregate.LastSnapshotVersion >=
        SnapshotStorageProvider.SnapshotFrequency)
        {
            SnapshotStorageProvider.SaveSnapshot(aggregate.TakeSnapshot());
        }

//Save events to event storage and publish to event bus. See step 9
....

(9) When we save those events in the Save(T aggregate) we also publish those events to an event bus / event publisher, which can be a message queue.

...
//After saving snapshots in step 8
//Commit Changes to EventStorage

         CommitChanges(aggregate);

         //Publish to event publisher asynchronously
         foreach (var e in changesToCommit)
         {
             EventPublisher.Publish(e);
         }
    }
}

Technically this is where the command side of CQRS ends but let’s look at couple more things that need to happen for the cycle to complete.

(10) There will be event handlers that listen to the OrderItemAddedEvent. Those event handlers express their interest in the types of events they want from the event bus (by subscribing).

The purpose of these event handlers are to listen to certain types of events and update the Read Model of our Order. Because we can have multiple event handlers for one type of event (unlike commands) we can have any number of read models for the Order. For example we can even have one dedicated for updating customer order total in a reporting database and another for product totals.

You can probably see the advantages of this now. Our write model (event stream) is totally separate from our read model. This allows us to scale different parts of our system as we wish.

(11) We typically use a in memory database like Redis or a relational database to store our read models. The idea is that the read side our application can quickly load a model by using a simple where clause without having to join anything.

A typical CQRS setup will have a read model for each view. An event handler for each view will listen to events (OrderItemAdded, OrderItemRemoved, OrderItemQtyUpdated) and update the view.


So to catch up.

  • We sent a command to add a new item to the order.
  • The AggregateRoot created an event(s) to capture the state change. (Even though in our example we only created one event, there can be many events that get generated¬†for a single command)
  • The AggregateRoot applied the event(s) to itself to do the state change.
  • The repository saved the changes to event storage. (And snapshot as needed)
  • The saving of the event(s) also published them to the event bus.
  • Event handlers listening to the event received it and updated their “world views” (read models)

What’s Next

That was a very brief explanation¬†of what each step does and how you could go about implementing it. I’ve used very simple code examples. For working examples I highly recommend you have a look at my Event Sourcing and CQRS framework¬†NEventLite at GitHub.

Now that we have looked at the steps in the¬†Command Side¬†of CQRS the next step is to implement the read side. In the next post¬†of the series we will look at how this can be done.¬†I’ll also demonstrate how to quickly get an ES + CQRS application up and running using NEventLite.

I hope this helps you understand how CQRS works. If you have any questions or suggestions please post them here.

Wish you a happy new year and see you soon.