Snapshots

Snapshots are an optimization that is completely optional. However, they become useful when event streams grow large, and reading them becomes slow. Snapshots allow you to store the state of an aggregate at a particular point in time, so you don’t need to replay all events from the beginning of the stream each time the aggregate is loaded.

To start using snapshots using this library you will need to create a SnapshotRepository.

Creating a Snapshot Repository

A SnapshotRepository is responsible for saving and loading snapshots, which are essentially instances of an aggregate at a certain revision. To create a snapshot repository for a specific aggregate you will need to make a snapshot repository class that extends the base SnapshotRepository class and uses the store behind the scenes to manage snapshots.

The base class provides the following methods:

  • save(id: Id, aggregate: A, pool?: ISnapshotPool): Promise<void>: Saves a snapshot of the aggregate.
  • load(id: Id, pool?: ISnapshotPool): Promise<A>: Loads the latest snapshot of the aggregate or returns a blank aggregate.
  • loadMany(ids: Id[], pool?: ISnapshotPool): Promise<A[]>: Loads the latest snapshots of multiple aggregates.
  • *loadAll(filter?: { aggregateId?: Id; limit?: number; pool?: string }): AsyncGenerator<SnapshotEnvelope<A>[]>: Search all latest snapshots from the store. Returns an async iterator.

The two methods you will need to implement are serialize and deserialize.

import { SnapshotRepository, Snapshot } from '@ocoda/event-sourcing';
 
@Snapshot(Account, { name: 'account', interval: 5 })
export class AccountSnapshotRepository extends SnapshotRepository<Account> {
	serialize({ id, ownerIds, balance, openedOn, closedOn }: Account): ISnapshot<Account> {
		return {
			id: id.value,
			ownerIds: ownerIds.map(({ value }) => value),
			balance,
			openedOn: openedOn ? openedOn.toISOString() : undefined,
			closedOn: closedOn ? closedOn.toISOString() : undefined,
		};
	}
 
	deserialize({ id, ownerIds, balance, openedOn, closedOn }: ISnapshot<Account>): Account {
		const account = new Account();
		account.id = AccountId.from(id);
		account.ownerIds = ownerIds.map(AccountOwnerId.from);
		account.balance = balance;
		account.openedOn = openedOn && new Date(openedOn);
		account.closedOn = closedOn && new Date(closedOn);
 
		return account;
	}
}

Breakdown of the code

  • @Snapshot(Account, { name: 'account', interval: 5 }):

    • The @Snapshot() decorator marks the class as a snapshot handler and specifies the aggregate type and the snapshot name.
    • The interval option specifies how often a snapshot should be taken. In this case, a snapshot will be taken every 5 events.
  • serialize({ id, ownerIds, balance, openedOn, closedOn }: Account):

    • The serialize method is responsible for converting an aggregate instance into a snapshot object. This method is called when saving a snapshot.
  • deserialize({ id, ownerIds, balance, openedOn, closedOn }: ISnapshot<Account>): Account:

    • The deserialize method is responsible for converting a snapshot object into an aggregate instance. This method is called when loading a snapshot.

Plugging it into you aggregate repository

If you created a snapshot repository and registered it as a provider, you can now plug it into your aggregate repository to optimize the loading process.

@Injectable()
export class AccountRepository {
  constructor(
    private readonly eventStore: EventStore,
    private readonly accountSnapshotRepository: AccountSnapshotRepository,
  ) {}
 
  async getById(accountId: AccountId) {
    const eventStream = EventStream.for<Account>(Account, accountId);
 
    const account = await this.accountSnapshotRepository.load(accountId);
 
    const events = this.eventStore.getEvents(eventStream, { fromVersion: account.version + 1 });
 
    await account.loadFromHistory(events);
 
    if (account.version < 1) {
      throw new AccountNotFoundException(accountId.value);
    }
 
    return account;
  }
 
  async getByIds(accountIds: AccountId[]) {
    const accounts = await this.accountSnapshotRepository.loadMany(accountIds, 'e2e');
 
    for (const account of accounts) {
      const eventStream = EventStream.for<Account>(Account, account.id);
      const eventCursor = this.eventStore.getEvents(eventStream, { pool: 'e2e', fromVersion: account.version + 1 });
      await account.loadFromHistory(eventCursor);
    }
 
    return accounts;
  }
 
  async save(account: Account): Promise<void> {
    const events = account.commit();
    const stream = EventStream.for<Account>(Account, account.id);
 
    // Append the events to the event store
    await this.eventStore.appendEvents(stream, account.version, events);
    // Save a snapshot of the account
    await this.accountSnapshotRepository.save(account.id, account);
  }
}