Awesome Open Source
Awesome Open Source

Twitter Follow Join the chat at https://gitter.im/EventSourcing-NetCore/community Github Actions blog

EventSourcing.NetCore

Tutorial, practical samples and other resources about Event Sourcing in .NET Core.

1. Event Sourcing

1.1 What is Event Sourcing?

Event Sourcing is a design pattern in which results of business operations are stored as a series of events.

It is an alternative way to persist data. In contrast with state-oriented persistence that only keeps the latest version of the entity state, Event Sourcing stores each state change as a separate event.

Thanks for that, no business data is lost. Each operation results in the event stored in the database. That enables extended auditing and diagnostics capabilities (both technically and business-wise). What's more, as events contains the business context, it allows wide business analysis and reporting.

In this repository I'm showing different aspects, patterns around Event Sourcing. From the basic to advanced practices.

Read more in my articles:

1.2 What is Event?

Events, represent facts in the past. They carry information about something accomplished. It should be named in the past tense, e.g. "user added", "order confirmed". Events are not directed to a specific recipient - they're broadcasted information. It's like telling a story at a party. We hope that someone listens to us, but we may quickly realise that no one is paying attention.

Events:

  • are immutable: "What has been seen, cannot be unseen".
  • can be ignored but cannot be retracted (as you cannot change the past).
  • can be interpreted differently. The basketball match result is a fact. Winning team fans will interpret it positively. Losing team fans - not so much.

Read more in my articles:

1.3 What is Stream?

Events are logically grouped into streams. In Event Sourcing, streams are the representation of the entities. All the entity state mutations ends up as the persisted events. Entity state is retrieved by reading all the stream events and applying them one by one in the order of appearance.

A stream should have a unique identifier representing the specific object. Each event has its own unique position within a stream. This position is usually represented by a numeric, incremental value. This number can be used to define the order of the events while retrieving the state. It can be also used to detect concurrency issues.

1.4 Event representation

Technically events are messages.

They may be represented, e.g. in JSON, Binary, XML format. Besides the data, they usually contain:

  • id: unique event identifier.
  • type: name of the event, e.g. "invoice issued".
  • stream id: object id for which event was registered (e.g. invoice id).
  • stream position (also named version, order of occurrence, etc.): the number used to decide the order of the event's occurrence for the specific object (stream).
  • timestamp: representing a time at which the event happened.
  • other metadata like correlation id, causation id, etc.

Sample event JSON can look like:

{
  "id": "e44f813c-1a2f-4747-aed5-086805c6450e",
  "type": "invoice-issued",
  "streamId": "INV/2021/11/01",
  "streamPosition": 1,
  "timestamp": "2021-11-01T00:05:32.000Z",

  "data":
  {
    "issuedTo": {
      "name": "Oscar the Grouch",
      "address": "123 Sesame Street"
    },
    "amount": 34.12,
    "number": "INV/2021/11/01",
    "issuedAt": "2021-11-01T00:05:32.000Z"
  },

  "metadata": 
  {
    "correlationId": "1fecc92e-3197-4191-b929-bd306e1110a4",
    "causationId": "c3cf07e8-9f2f-4c2d-a8e9-f8a612b4a7f1"
  }
}

1.5 Event Storage

Event Sourcing is not related to any type of storage implementation. As long as it fulfils the assumptions, it can be implemented having any backing database (relational, document, etc.). The state has to be represented by the append-only log of events. The events are stored in chronological order, and new events are appended to the previous event. Event Stores are the databases' category explicitly designed for such purpose.

Read more in my article:

1.6 Retrieving the current state from events

In Event Sourcing, the state is stored in events. Events are logically grouped into streams. Streams can be thought of as the entities' representation. Traditionally (e.g. in relational or document approach), each entity is stored as a separate record.

Id IssuerName IssuerAddress Amount Number IssuedAt
e44f813c Oscar the Grouch 123 Sesame Street 34.12 INV/2021/11/01 2021-11-01

In Event Sourcing, the entity is stored as the series of events that happened for this specific object, e.g. InvoiceInitiated, InvoiceIssued, InvoiceSent.

[
    {
        "id": "e44f813c-1a2f-4747-aed5-086805c6450e",
        "type": "invoice-initiated",
        "streamId": "INV/2021/11/01",
        "streamPosition": 1,
        "timestamp": "2021-11-01T00:05:32.000Z",

        "data":
        {
            "issuer": {
                "name": "Oscar the Grouch",
                "address": "123 Sesame Street",
            },
            "amount": 34.12,
            "number": "INV/2021/11/01",
            "initiatedAt": "2021-11-01T00:05:32.000Z"
        }
    },        
    {
        "id": "5421d67d-d0fe-4c4c-b232-ff284810fb59",
        "type": "invoice-issued",
        "streamId": "INV/2021/11/01",
        "streamPosition": 2,
        "timestamp": "2021-11-01T00:11:32.000Z",

        "data":
        {
            "issuedTo": "Cookie Monster",
            "issuedAt": "2021-11-01T00:11:32.000Z"
        }
    },        
    {
        "id": "637cfe0f-ed38-4595-8b17-2534cc706abf",
        "type": "invoice-sent",
        "streamId": "INV/2021/11/01",
        "streamPosition": 3,
        "timestamp": "2021-11-01T00:12:01.000Z",

        "data":
        {
            "sentVia": "email",
            "sentAt": "2021-11-01T00:12:01.000Z"
        }
    }
]

All of those events shares the stream id ("streamId": "INV/2021/11/01"), and have incremented stream position.

We can get to conclusion that in Event Sourcing entity is represented by stream, so sequence of event correlated by the stream id ordered by stream position.

To get the current state of entity we need to perform the stream aggregation process. We're translating the set of events into a single entity. This can be done with the following the steps:

  1. Read all events for the specific stream.
  2. Order them ascending in the order of appearance (by the event's stream position).
  3. Construct the empty object of the entity type (e.g. with default constructor).
  4. Apply each event on the entity.

This process is called also stream aggregation or state rehydration.

We could implement that as:

public record Person(
    string Name,
    string Address
);

public record InvoiceInitiated(
    double Amount,
    string Number,
    Person IssuedTo,
    DateTime InitiatedAt
);

public record InvoiceIssued(
    string IssuedBy,
    DateTime IssuedAt
);

public enum InvoiceSendMethod
{
    Email,
    Post
}

public record InvoiceSent(
    InvoiceSendMethod SentVia,
    DateTime SentAt
);

public enum InvoiceStatus
{
    Initiated = 1,
    Issued = 2,
    Sent = 3
}

public class Invoice
{
    public string Id { get;set; }
    public double Amount { get; private set; }
    public string Number { get; private set; }

    public InvoiceStatus Status { get; private set; }

    public Person IssuedTo { get; private set; }
    public DateTime InitiatedAt { get; private set; }

    public string IssuedBy { get; private set; }
    public DateTime IssuedAt { get; private set; }

    public InvoiceSendMethod SentVia { get; private set; }
    public DateTime SentAt { get; private set; }

    public void When(object @event)
    {
        switch (@event)
        {
            case InvoiceInitiated invoiceInitiated:
                Apply(invoiceInitiated);
                break;
            case InvoiceIssued invoiceIssued:
                Apply(invoiceIssued);
                break;
            case InvoiceSent invoiceSent:
                Apply(invoiceSent);
                break;
        }
    }

    private void Apply(InvoiceInitiated @event)
    {
        Id = @event.Number;
        Amount = @event.Amount;
        Number = @event.Number;
        IssuedTo = @event.IssuedTo;
        InitiatedAt = @event.InitiatedAt;
        Status = InvoiceStatus.Initiated;
    }

    private void Apply(InvoiceIssued @event)
    {
        IssuedBy = @event.IssuedBy;
        IssuedAt = @event.IssuedAt;
        Status = InvoiceStatus.Issued;
    }

    private void Apply(InvoiceSent @event)
    {
        SentVia = @event.SentVia;
        SentAt = @event.SentAt;
        Status = InvoiceStatus.Sent;
    }
}

and use it as:

var invoiceInitiated = new InvoiceInitiated(
    34.12,
    "INV/2021/11/01",
    new Person("Oscar the Grouch", "123 Sesame Street"),
    DateTime.UtcNow
);
var invoiceIssued = new InvoiceIssued(
    "Cookie Monster",
    DateTime.UtcNow
);
var invoiceSent = new InvoiceSent(
    InvoiceSendMethod.Email,
    DateTime.UtcNow
);

// 1,2. Get all events and sort them in the order of appearance
var events = new object[] {invoiceInitiated, invoiceIssued, invoiceSent};

// 3. Construct empty Invoice object
var invoice = new Invoice();

// 4. Apply each event on the entity.
foreach (var @event in events)
{
    invoice.When(@event);
}

and generalise this into Aggregate base class:

public abstract class Aggregate<T>
{
    public T Id { get; protected set; }
    
    protected Aggregate() { }
        
    public virtual void When(object @event) { }
}

The biggest advantage of "online" stream aggregation is that it always uses the most recent business logic. So after the change in the apply method, it's automatically reflected on the next run. If events data is fine, then it's not needed to do any migration or updates.

In Marten When method is not needed. Marten uses naming convention and call the Apply method internally. It has to:

  • have single parameter with event object,
  • have void type as the result.

See samples:

Read more in my article:

2. Support

Feel free to create an issue if you have any questions or request for more explanation or samples. I also take Pull Requests!

If this repository helped you - I'd be more than happy if you join the group of my official supporters at:

Github Sponsors

3. Prerequisites

For running the Event Store examples you need to have:

  1. .NET 5 installed - https://dotnet.microsoft.com/download/dotnet/5.0
  2. Docker installed. Then going to the docker folder and running:
docker-compose up

More information about using .NET Core, WebApi and Docker you can find in my other tutorials: WebApi with .NET

You can also watch my presentation "Practical Event Sourcing with Marten":

Practical Event Sourcing with Marten (EN)

and discussion with Yves Lorphelin about CQRS:

Event Store Conversations: Yves Lorphelin talks to Oskar Dudycz about CQRS (EN)

4. Tools used

  1. Marten - Event Store and Read Models
  2. EventStoreDB - Event Store
  3. MediatR - Internal In-Memory Message Bus (for processing Commands, Queries, Events)
  4. Kafka - External Durable Message Bus to integrate services
  5. ElasticSearch - Read Models

5. Samples

See also fully working, real-world samples of Event Sourcing and CQRS applications in Samples folder.

  • ECommerce with Marten

    • typical Event Sourcing and CQRS flow,
    • DDD using Aggregates,
    • microservices example,
    • stores events to Marten,
    • distributed processes coordinated by Saga (Order Saga),
    • Kafka as a messaging platform to integrate microservices,
    • example of the case when some services are event-sourced (Carts, Orders, Payments) and some are not (Shipments using EntityFramework as ORM)
  • ECommerce with EventStoreDB

    • typical Event Sourcing and CQRS flow,
    • DDD using Aggregates,
    • stores events to EventStoreDB,
    • Builds read models using Subscription to $all.
    • Read models are stored as Marten documents.
  • Warehouse

    • simplest CQRS flow using .NET 5 Endpoints,
    • example of how and where to use C# Records, Nullable Reference Types, etc,
    • No Event Sourcing! Using Entity Framework to show that CQRS is not bounded to Event Sourcing or any type of storage,
    • No Aggregates! CQRS do not need DDD. Business logic can be handled in handlers.
  • Meetings Management with Marten

    • typical Event Sourcing and CQRS flow,
    • DDD using Aggregates,
    • microservices example,
    • stores events to Marten,
    • Kafka as a messaging platform to integrate microservices,
    • read models handled in separate microservice and stored to other database (ElasticSearch)
  • Cinema Tickets Reservations with Marten

    • typical Event Sourcing and CQRS flow,
    • DDD using Aggregates,
    • stores events to Marten.
  • SmartHome IoT with Marten

    • typical Event Sourcing and CQRS flow,
    • DDD using Aggregates,
    • stores events to Marten,
    • asynchronous projections rebuild using AsynDaemon feature.

6. Self-paced training Kit

I prepared the self-paced training Kit for the Event Sourcing. See more in the Workshop description.

Event Sourcing basics - it teaches the event store basics by showing how to build your Event Store on Relational Database. It starts with the tables setup, goes through appending events, aggregations, projections, snapshots, and finishes with the Marten basics. See more in here.

  1. Streams Table
  2. Events Table
  3. Appending Events
  4. Optimistic Concurrency Handling
  5. Event Store Methods
  6. Stream Aggregation
  7. Time Travelling
  8. Aggregate and Repositories
  9. Snapshots
  10. Projections
  11. Projections With Marten

7. Articles

Read also more on the Event Sourcing and CQRS topics in my blog posts:

Slides:

  • Practical Event Sourcing with Marten - EN, PL
  • Ligths and Shades of Event-Driven Design - EN, PL
  • Adventures in Event Sourcing and CQRS - PL

8. Event Store - Marten

  • Creating event store
  • Event Stream - is a representation of the entity in event sourcing. It's a set of events that happened for the entity with the exact id. Stream id should be unique, can have different types but usually is a Guid.
    • Stream starting - stream should be always started with a unique id. Marten provides three ways of starting the stream:
      • calling StartStream method with a stream id
        var streamId = Guid.NewGuid();
        documentSession.Events.StartStream<IssuesList>(streamId);
        
      • calling StartStream method with a set of events
        var @event = new IssueCreated { IssueId = Guid.NewGuid(), Description = "Description" };
        var streamId = documentSession.Events.StartStream<IssuesList>(@event);
        
      • just appending events with a stream id
        var @event = new IssueCreated { IssueId = Guid.NewGuid(), Description = "Description" };
        var streamId = Guid.NewGuid();
        documentSession.Events.Append(streamId, @event);
        
    • Stream loading - all events that were placed on the event store should be possible to load them back. Marten allows to:
      • get list of event by calling FetchStream method with a stream id
        var eventsList = documentSession.Events.FetchStream(streamId);
        
      • geting one event by its id
        var @event = documentSession.Events.Load<IssueCreated>(eventId);
        
    • Stream loading from exact state - all events that were placed on the event store should be possible to load them back. Marten allows to get stream from exact state by:
      • timestamp (has to be in UTC)
        var dateTime = new DateTime(2017, 1, 11);
        var events = documentSession.Events.FetchStream(streamId, timestamp: dateTime);
        
      • version number
        var versionNumber = 3;
        var events = documentSession.Events.FetchStream(streamId, version: versionNumber);
        
  • Event stream aggregation - events that were stored can be aggregated to form the entity once again. During the aggregation, process events are taken by the stream id and then replied event by event (so eg. NewTaskAdded, DescriptionOfTaskChanged, TaskRemoved). At first, an empty entity instance is being created (by calling default constructor). Then events based on the order of appearance are being applied on the entity instance by calling proper Apply methods.
    • Online Aggregation - online aggregation is a process when entity instance is being constructed on the fly from events. Events are taken from the database and then aggregation is being done. The biggest advantage of online aggregation is that it always gets the most recent business logic. So after the change, it's automatically reflected and it's not needed to do any migration or updates.
    • Inline Aggregation (Snapshot) - inline aggregation happens when we take the snapshot of the entity from the DB. In that case, it's not needed to get all events. Marten stores the snapshot as a document. This is good for performance reasons because only one record is being materialized. The con of using inline aggregation is that after business logic has changed records need to be reaggregated.
    • Reaggregation - one of the biggest advantages of the event sourcing is flexibility to business logic updates. It's not needed to perform complex migration. For online aggregation it's not needed to perform reaggregation - it's being made always automatically. The inline aggregation needs to be reaggregated. It can be done by performing online aggregation on all stream events and storing the result as a snapshot.
      • reaggregation of inline snapshot with Marten
        var onlineAggregation = documentSession.Events.AggregateStream<TEntity>(streamId);
        documentSession.Store<TEntity>(onlineAggregation);
        documentSession.SaveChanges();
        
  • Event transformations
  • Events projection
  • Multitenancy per schema

9. Message Bus (for processing Commands, Queries, Events) - MediatR

  • Initialization - MediatR uses services locator pattern to find a proper handler for the message type.
  • Sending Messages - finds and uses the first registered handler for the message type. It could be used for queries (when we need to return values), commands (when we acting).
    • No Handlers - when MediatR doesn't find proper handler it throws an exception.
    • Single Handler - by implementing IRequestHandler we're deciding that this handler should be run asynchronously with other async handlers (so we don't wait for the previous handler to finish its work).
    • More Than One Handler - when there is more than one handler registered MediatR takes only one ignoring others when Send method is being called.
  • Publishing Messages - finds and uses all registered handlers for the message type. It's good for processing events.
    • No Handlers - when MediatR doesn't find proper handler it throws an exception
    • Single Handler - by implementing INotificationHandler we're deciding that this handler should be run asynchronously with other async handlers (so we don't wait for the previous handler to finish its work)
    • More Than One Handler - when there is more than one handler registered MediatR takes all of them when calling Publish method
  • Pipeline (to be defined)

10. CQRS (Command Query Responsibility Separation)

11. NuGet packages to help you get started.

I gathered and generalized all of the practices used in this tutorial/samples in Nuget Packages maintained by me GoldenEye Framework. See more in:

  • GoldenEye DDD package - it provides a set of base and bootstrap classes that helps you to reduce boilerplate code and help you focus on writing business code. You can find all classes like Commands/Queries/Event handlers and many more. To use it run:

    dotnet add package GoldenEye.Backend.Core.DDD

  • GoldenEye Marten package - contains helpers, and abstractions to use Marten as document/event store. Gives you abstractions like repositories etc. To use it run:

    dotnet add package GoldenEye.Backend.Core.Marten

The simplest way to start is installing the project template by running

dotnet new -i GoldenEye.WebApi.Template.SimpleDDD

and then creating a new project based on it:

dotnet new SimpleDDD -n NameOfYourProject

12. Other resources

12.1 Introduction

12.2 Event Sourcing on production

12.3 Projections

12.4 Snapshots

12.5 Versioning

12.6 Storage

12.7 Design & Modeling

12.8 GDPR

12.9 Conflict Detection

12.10 Functional programming

12.12 Testing

12.13 CQRS

12.14 Tools

12.15 Event Sourcing vs Messaging

12.15 Event processing

12.16 Distributed processes

12.17 Domain Driven Design

12.18 Architecture Weekly

If you're interested in Architecture resources, check my other repository: https://github.com/oskardudycz/ArchitectureWeekly/.

It contains a weekly updated list of materials I found valuable and educational.


EventSourcing.NetCore is Copyright © 2017-2021 Oskar Dudycz and other contributors under the MIT license.


Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
C Sharp (275,779
Example (4,594
Dotnet Core (4,284
Netcore (1,533
Cqrs (990
Event Sourcing (909
Mediatr (159
Related Projects