Aggregate Event Persistence
We are going to use Greg Young’s In-Memory Event Store from his SimpleCQRS project to demonstrate persisting an Event Sourced Aggregate. I’ll talk more about other Event Stores in future blog posts. Here are a few:
Looking back at our Event Sourced Aggregate Root, we’re using it as a Layer Supertype (PoEAA). The crux method of the AggregateRoot
base class is the ApplyChange
method.
public abstract class AggregateRoot : Entity {
private readonly List<Event> _changes = new List<Event>();
...
protected void ApplyChange(Event @event) {
ApplyChange(@event, true);
}
// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
private void ApplyChange(Event @event, bool isNew) {
this.AsDynamic().Apply(@event);
if (isNew) _changes.Add(@event);
}
}
For every Domain Event that an Aggregate transitions from a Command (Command -> Event), it implements an Apply(Event e)
method for that Event. This method applies the desired change to the state of the Aggregate using the Event rather than mutating it
Here’s a side-by-side comparison of the initialization of a RoastSchedule
Aggregate using an Application Service vs a Command Handler.
Before
Application Service
public class CreateRoastScheduleApplicationService {
private readonly IRepository<RoastSchedule> _repository;
public CreateRoastScheduleApplicationService(IRepository<RoastSchedule> repository) {
_repository = repository;
}
public RoastSchedule CreateRoastSchedule() {
var roastSchedule = new RoastSchedule(Guid.NewGuid());
_repository.Save(roastSchedule);
return roastSchedule;
}
}
Roast Schedule Aggregate
public class RoastSchedule : AggregateRoot {
// public Guid Id { get; protected set; } defined in AggregateRoot
// could also be defined as abstract and overridden here
public RoastSchedule(Guid id) {
// satisfy invariants
Id = id
}
}
After
Command Handler
public class CreateRoastScheduleCommandHandler : ICommandHandler<CreateRoastSchedule> {
private readonly IRepository<RoastSchedule> _repository;
public CreateRoastScheduleCommandHandler(IRepository<RoastSchedule> repository) {
_repository = repository;
}
public void Handle(CreateRoastSchedule message) {
var roastSchedule = new RoastSchedule(Guid.NewGuid());
_repository.Save(roastSchedule, -1);
}
}
Roast Schedule Aggregate
public class RoastSchedule : AggregateRoot {
// inherited from AggregateRoot base class...
// public Guid Id { get; protected set; }
// private readonly List<Event> _changes = new List<Event>();
// protected void ApplyChange(Event @event) {
// ApplyChange(@event, true);
// }
// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
// private void ApplyChange(Event @event, bool isNew) {
// this.AsDynamic().Apply(@event);
// if (isNew) _changes.Add(@event);
// }
public RoastSchedule(Guid id) {
// satisfy invariants...
ApplyChange(new RoastScheduleCreatedEvent(Id));
}
public void Apply(RoastScheduleCreatedEvent e) {
Id = e.Id;
}
}
In a Persistence-Oriented Repository, persisting the state of the Aggregate to storage would happen by calling its Save
method after this state transition. All changes in state up until that point are still in memory. Calling Save
would then use an ORM to persist this new object (or object with state changed) to the SQL database. This is when we get into the different ways that your ORM handles concurrency, e.g. Optimistic or Pessimistic Concurrency.
In the case of our Event Sourced Aggregate, a Command is issued on an Aggregate to change its state. The method called by the Command Handler, e.g. UpdateRoastDays
, CreateNew
, ChangeName
, etc. validates the Command against Aggregate Invariants and then calls ApplyChange(Event @event)
.
// RoastSchedule constructor
public RoastSchedule(Guid id) {
// satisfy invariants...
ApplyChange(new RoastScheduleCreatedEvent(Id));
}
ApplyChange
does two things: 1) calls the Aggregate’s Apply(Event e)
method for whatever Event type was emitted by new RoastSchedule(Guid id)
and 2) appends the resulting Event to private readonly List<Event> _changes
. I’ll show it again:
protected void ApplyChange(Event @event) {
ApplyChange(@event, true);
}
// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
private void ApplyChange(Event @event, bool isNew) {
this.AsDynamic().Apply(@event);
if (isNew) _changes.Add(@event);
}
At this point, _changes
is a list of Events called on the Aggregate and held in-memory, just as an ORM would track changes to an object before Save
was called. Except this time, when we call Save
on our IRepository<T>
with IEventStore
as its persistence store - we append all of the uncommitted (i.e. un-persisted to the Event Store) events in RoastSchedule
’s internal List<Event> _changes
collection to the Event Store. We persist the events rather than the state of the RoastSchedule
Aggregate. This is the essence of Event Sourcing.
With _events
containing one or more uncommitted events (meaning not yet saved to the Event Store), calling Save
on our concreted Event Store-based Repository saves each event to the Event Store:
class EventSourcedRepository<T> : IRepository<T> where T : AggregateRoot, new() {
private readonly IEventStore _storage;
public EventSourcedRepository(IEventStore storage) {
_storage = storage;
}
// here's where we persist the uncommitted events in AggregateRoot._changes
// to IEventStore _storage...
public void Save(AggregateRoot aggregate, int expectedVersion) {
_storage.SaveEvents(aggregate.Id, aggregate.GetUncommittedChanges(), expectedVersion);
}
public T GetById(Guid id) {
...
}
}
Now that we’ve talked a lot about changing the state of our Aggregate and saving that state, we can start talking about reading state information using Events.
I will be releasing most of this content only to subscribers. Make sure you sign up by clicking the big red button!
Related Posts:
- Event Store
- Command Handlers
- Implementing an Event Sourced Aggregate
- Design Level Continued
- Complexity and Cost
- Design Level EventStorming
- People and Commands
- Hotspots
- Domain Discoveries
- Big Picture EventStorming
- The Domain - First Pop Coffee Company
- More Efficient Domain Modeling with EventStorming
- Where do you find resources for learning DDD, CQRS, Event Sourcing, and EventStorming?
- Has the code devolved into a big ball of mud?… What can you do about it?
- A Better Way to Project Domain Entities into DTOs
- Exposing IQueryable in a CQRS Query Stack
- A Pattern to Decouple your Aggregates from their Clients
- Erlang-style Supervisors in C# with Akka.NET and the Actor Model