Repositories
Aggregate repositories serve as a bridge between your aggregates and the event store. Their responsibility is to retrieve aggregates from the event store by rehydrating them from a stream of events and persisting new events that represent changes in the aggregate's state. The repository provides an interface for interacting with the underlying event store without exposing the complexity of event sourcing to the rest of your application.
Defining an Aggregate Repository
An aggregate repository typically involves two operations:
- Retrieving an aggregate by loading its historical events and rehydrating it.
- Persisting changes by appending new events to the event stream.
Here is an example of an AccountRepository
that handles the Account
aggregate:
import { Injectable } from '@nestjs/common';
import { EventStore, EventStream } from '@ocoda/event-sourcing';
import { Account, AccountId } from '../../domain/models';
@Injectable()
export class AccountRepository {
constructor(private readonly eventStore: EventStore) {}
async getById(accountId: AccountId): Promise<Account> {
const eventStream = EventStream.for<Account>(Account, accountId);
const account = new Account();
const eventCursor = this.eventStore.getEvents(eventStream, { fromVersion: account.version + 1 });
await account.loadFromHistory(eventCursor);
return account;
}
async save(account: Account): Promise<void> {
const events = account.commit();
const stream = EventStream.for<Account>(Account, account.id);
await this.eventStore.appendEvents(stream, account.version, events);
}
}
Breakdown of the code
- Constructor injection: the EventStore is injected into the repository through the constructor, enabling interaction with the underlying event sourcing persistence layer.
- getById(accountId: AccountId):
- retrieves an Account aggregate from the event store.
- the
EventStream.for<Account>(Account, accountId)
is used to define the event stream specific to the Account aggregate with the accountId. - the aggregate is then rehydrated by loading its history using account.loadFromHistory(eventCursor), where the eventCursor fetches events starting from the aggregate's current version.
- save(account: Account):
- first, the method commits any pending changes in the aggregate by calling account.commit(), which retrieves the new events generated by the aggregate.
- the new events are then appended to the event stream via this.eventStore.appendEvents(stream, account.version, events), ensuring the aggregate is persisted with its updated state.
Registering the Repository
To use the AccountRepository
in your application, you need to register it as a provider in your NestJS module:
import { Module } from '@nestjs/common';
@Module({ providers: [AccountRepository] })
export class AccountModule {}
Guidelines for working with Repositories
Repositories Should Be Focused on Aggregates
Each repository should handle only one type of aggregate. In this example, AccountRepository handles only the Account aggregate. This keeps the repository focused and aligned with DDD principles.
Event Store as the Source of Truth
The event store serves as the source of truth for your aggregates. Repositories should always reconstruct aggregates by replaying events from the event store, which ensures that your domain state is always derived from the historical sequence of events.
Use Event Streams for Versioning
Event streams ensure that changes to aggregates are versioned properly. When saving an aggregate, always include its current version, as shown in account.version. This prevents concurrency issues, ensuring that multiple users don't overwrite each other's changes when working with the same aggregate.
Lazy Loading of Events
While fetching aggregates, events should be loaded lazily or in a paginated fashion, especially for aggregates with long histories. This example uses eventStore.getEvents(eventStream, { fromVersion: account.version + 1 })
to only fetch events after the aggregate's current version, optimizing performance and ensuring only relevant events are processed.
When aggregates start containing a lot of events and this becomes a performance bottleneck, you can consider using snapshots to optimize the loading process.
Commit Events After Business Logic Execution
The commit()
method of the aggregate should be called after all business logic has been executed. This ensures that the events represent the result of valid domain operations and that the aggregate is in a consistent state before the events are persisted.