Aggregate Event Persistence

We are going to use Greg Young’s In-Memory Event Store from his SimpleCQRS project to demonstrate persisting an Event Sourced Aggregate. I’ll talk more about other Event Stores in future blog posts. Here are a few:

Looking back at our Event Sourced Aggregate Root, we’re using it as a Layer Supertype (PoEAA). The crux method of the AggregateRoot base class is the ApplyChange method.

public abstract class AggregateRoot : Entity {

    private readonly List<Event> _changes = new List<Event>();

	...

	protected void ApplyChange(Event @event) {
	    ApplyChange(@event, true);
	}

	// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
	private void ApplyChange(Event @event, bool isNew) {
	    this.AsDynamic().Apply(@event);
	    if (isNew) _changes.Add(@event);
	}
}

For every Domain Event that an Aggregate transitions from a Command (Command -> Event), it implements an Apply(Event e) method for that Event. This method applies the desired change to the state of the Aggregate using the Event rather than mutating it

Here’s a side-by-side comparison of the initialization of a RoastSchedule Aggregate using an Application Service vs a Command Handler.

Before

Application Service

public class CreateRoastScheduleApplicationService {

    private readonly IRepository<RoastSchedule> _repository;

    public CreateRoastScheduleApplicationService(IRepository<RoastSchedule> repository) {
        _repository = repository;
    }

    public RoastSchedule CreateRoastSchedule() {
        var roastSchedule = new RoastSchedule(Guid.NewGuid());
        _repository.Save(roastSchedule);
        return roastSchedule;
    }
}

Roast Schedule Aggregate

public class RoastSchedule : AggregateRoot {

    // public Guid Id { get; protected set; } defined in AggregateRoot
    // could also be defined as abstract and overridden here

    public RoastSchedule(Guid id) {
    	// satisfy invariants
        Id = id
    }
}

After

Command Handler

public class CreateRoastScheduleCommandHandler : ICommandHandler<CreateRoastSchedule> {

    private readonly IRepository<RoastSchedule> _repository;

    public CreateRoastScheduleCommandHandler(IRepository<RoastSchedule> repository) {
        _repository = repository;
    }

    public void Handle(CreateRoastSchedule message) {
        var roastSchedule = new RoastSchedule(Guid.NewGuid());
        _repository.Save(roastSchedule, -1);
    }
}

Roast Schedule Aggregate

public class RoastSchedule : AggregateRoot {

	// inherited from AggregateRoot base class...
    // public Guid Id { get; protected set; }
    // private readonly List<Event> _changes = new List<Event>();

	// protected void ApplyChange(Event @event) {
	//     ApplyChange(@event, true);
	// }

	// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
	// private void ApplyChange(Event @event, bool isNew) {
	//     this.AsDynamic().Apply(@event);
	//     if (isNew) _changes.Add(@event);
	// }

    public RoastSchedule(Guid id) {
    	// satisfy invariants...
        ApplyChange(new RoastScheduleCreatedEvent(Id));
    }

    public void Apply(RoastScheduleCreatedEvent e) {
        Id = e.Id;
    }
}

In a Persistence-Oriented Repository, persisting the state of the Aggregate to storage would happen by calling its Save method after this state transition. All changes in state up until that point are still in memory. Calling Save would then use an ORM to persist this new object (or object with state changed) to the SQL database. This is when we get into the different ways that your ORM handles concurrency, e.g. Optimistic or Pessimistic Concurrency.

In the case of our Event Sourced Aggregate, a Command is issued on an Aggregate to change its state. The method called by the Command Handler, e.g. UpdateRoastDays, CreateNew, ChangeName, etc. validates the Command against Aggregate Invariants and then calls ApplyChange(Event @event).

// RoastSchedule constructor
public RoastSchedule(Guid id) {
	// satisfy invariants...
    ApplyChange(new RoastScheduleCreatedEvent(Id));
}

ApplyChange does two things: 1) calls the Aggregate’s Apply(Event e) method for whatever Event type was emitted by new RoastSchedule(Guid id) and 2) appends the resulting Event to private readonly List<Event> _changes. I’ll show it again:

protected void ApplyChange(Event @event) {
    ApplyChange(@event, true);
}

// push atomic aggregate changes to local history for further processing (EventStore.SaveEvents)
private void ApplyChange(Event @event, bool isNew) {
    this.AsDynamic().Apply(@event);
    if (isNew) _changes.Add(@event);
}

At this point, _changes is a list of Events called on the Aggregate and held in-memory, just as an ORM would track changes to an object before Save was called. Except this time, when we call Save on our IRepository<T> with IEventStore as its persistence store - we append all of the uncommitted (i.e. un-persisted to the Event Store) events in RoastSchedule’s internal List<Event> _changes collection to the Event Store. We persist the events rather than the state of the RoastSchedule Aggregate. This is the essence of Event Sourcing.

With _events containing one or more uncommitted events (meaning not yet saved to the Event Store), calling Save on our concreted Event Store-based Repository saves each event to the Event Store:

class EventSourcedRepository<T> : IRepository<T> where T : AggregateRoot, new() {

	private readonly IEventStore _storage;

    public EventSourcedRepository(IEventStore storage) {
        _storage = storage;
    }

    // here's where we persist the uncommitted events in AggregateRoot._changes
    // to IEventStore _storage...
    public void Save(AggregateRoot aggregate, int expectedVersion) {
        _storage.SaveEvents(aggregate.Id, aggregate.GetUncommittedChanges(), expectedVersion);
    }

    public T GetById(Guid id) {
        ...
    }
}

Now that we’ve talked a lot about changing the state of our Aggregate and saving that state, we can start talking about reading state information using Events.


continue reading…

previous…


I will be releasing most of this content only to subscribers. Make sure you sign up by clicking the big red button!


Related Posts:

Tweet
comments powered by Disqus