Designing Scalable Systems with CQRS and Event Sourcing

November 20, 2025

What is CQRS and Event Sourcing?

CQRS (Command Query Responsibility Segregation) is a pattern that separates the write side of your system—commands that change data—from the read side that queries data. This makes it easier to optimize and scale both parts independently.

Event Sourcing complements CQRS by storing every state change as an immutable event instead of overwriting data. The current state of an entity is rebuilt by replaying its events, creating a complete history of what happened.

How CQRS, Event Sourcing, and DDD relate

CQRS and Event Sourcing are separate architectural patterns. You can use CQRS without an event store, and you can use Event Sourcing without maintaining separate read models. These patterns are often combined when the domain needs clear write boundaries, auditability, and read-side optimization.

Domain-Driven Design (DDD) is another concern. It becomes useful when the domain is complex enough to warrant explicit bounded contexts, aggregates, and ubiquitous language. CQRS and Event Sourcing pair well with DDD, but neither requires full DDD adoption. For simple domains, a traditional CRUD model may still be sufficient.

Why use CQRS and Event Sourcing?

Traditional CRUD systems mix reads and writes on the same models, which can become complex as applications grow. CQRS and Event Sourcing introduce a clearer separation of concerns, making it easier to evolve your domain logic, integrate other systems through events, and audit every change in your data.

These patterns add structural and operational complexity. They are valuable when the domain has meaningful business rules, when the system must track every state change, or when reads have different performance requirements than writes. For domains with straightforward CRUD semantics, the overhead may not be justified.

Benefits of CQRS and Event Sourcing

  • Scalability: Read and write sides scale independently.
  • Auditability: Every change is recorded and traceable.
  • Flexibility: Different models can serve optimized reads and consistent writes.
  • Debugging: Replay events to reproduce past states or test new logic.
  • Integration: Emit domain events that other services can subscribe to.

Architecture Overview

In a CQRS and Event Sourcing setup, the system is divided into two sides:

  • The write side handles commands that change state.
  • The read side handles queries that retrieve state.

Commands go through a Command Handler, which applies logic on a Domain Model (or Aggregate). When something changes, the domain model emits events that are saved in the Event Store.

These events are then published through an Event Bus and processed by Event Handlers, which update the Read Database (a projection optimized for queries).

For simplicity, this example keeps the read and write sides within the same NestJS application, but in production they can be separate services communicating through an event bus.

Event Sourcing Architecture

Implementing Event Sourcing in NestJS: Step-by-Step

The rest of this guide walks through a concrete implementation in NestJS: an Invoice service that creates invoices and marks them as paid.

The repository is structured around four layers:

  • core/ – application-wide infrastructure (database connections)
  • shared/ – reusable event sourcing building blocks (event store, aggregate rehydration)
  • invoices/ – the invoice bounded context (domain, commands, queries, HTTP)
  • src/main.ts & src/app.module.ts – NestJS bootstrap and composition

This layout follows a domain-driven design setup:

  • Domain code for the invoices bounded context depends on domain concepts such as aggregates, value objects, and domain events.
  • Application coordinates use cases by invoking aggregates, persisting events, and triggering projections.
  • Infrastructure provides technical capabilities such as persistence, database connections, and event dispatching.
  • Presentation exposes the application through HTTP or other transports.

This separation keeps the domain model of the bounded context stable even when infrastructure or API layers evolve.


src/
  core/
  shared/
  invoices/
    application/
    domain/
    infrastructure/
    presentation/

Step 1: Bootstrapping the NestJS project

You can start from a standard NestJS application:

npx @nestjs/cli new event-sourcing
cd event-sourcing
pnpm add @nestjs/cqrs @nestjs/typeorm typeorm pg @nestjs/swagger @nestjs/mapped-types

The main entry point enables Swagger for inspection of the API:

// src/main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const config = new DocumentBuilder()
    .setTitle('Event Sourcing Example')
    .setDescription('An example of event sourcing with NestJS')
    .setVersion('1.0')
    .build();

  const documentFactory = () => SwaggerModule.createDocument(app, config);
  SwaggerModule.setup('api', app, documentFactory);

  await app.listen(process.env.PORT ?? 3001);
}

bootstrap();

Why this setup?

  • @nestjs/cqrs gives you CommandBus, QueryBus, EventBus, and helpers around aggregates and handlers. This is the backbone for CQRS in Nest.
  • Swagger shortens the feedback loop: you can create and pay invoices from a browser while watching the events and invoices tables change.

The root module wires up CQRS, core infrastructure, shared event-sourcing utilities, and the invoice module:

// src/app.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { InvoicesModule } from './invoices/application/invoices.module';
import { SharedModule } from './shared/shared.module';
import { CoreModule } from './core/core.module';

@Module({
  imports: [CqrsModule.forRoot(), CoreModule, SharedModule, InvoicesModule],
  controllers: [],
  providers: [],
})
export class AppModule {}

Why keep AppModule this minimal?

AppModule is the composition root. It should describe what the system is made of, not how it works internally:

  • CoreModule defines global platform concerns (database connections).
  • SharedModule holds generic event-sourcing infrastructure reusable across bounded contexts.
  • InvoicesModule is a single bounded context; more contexts can be added later without touching the others.

This structure scales to multiple domains and multiple transport layers.

Step 2: Configuring two databases (event store + read model)

This example uses Postgres for both the event store and the read database, each with its own TypeORM connection:

// src/core/core.constants.ts
export const EVENT_STORE_CONNECTION = 'EVENT_STORE_CONNECTION';

// src/core/core.module.ts
import { Module } from '@nestjs/common';
import { EVENT_STORE_CONNECTION } from './core.constants';
import { TypeOrmModule } from '@nestjs/typeorm';
import 'dotenv/config';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: process.env.EVENT_STORE_HOST,
      username: process.env.EVENT_STORE_USERNAME,
      password: process.env.EVENT_STORE_PASSWORD,
      database: process.env.EVENT_STORE_DATABASE,
      port: Number(process.env.EVENT_STORE_PORT),
      // synchronize: process.env.NODE_ENV === 'development',
      synchronize: true,
      autoLoadEntities: true,
      name: EVENT_STORE_CONNECTION,
    }),
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: process.env.READ_DB_HOST,
      username: process.env.READ_DB_USERNAME,
      password: process.env.READ_DB_PASSWORD,
      database: process.env.READ_DB_DATABASE,
      port: Number(process.env.READ_DB_PORT),
      // synchronize: process.env.NODE_ENV === 'development',
      synchronize: true,
      autoLoadEntities: true,
    }),
  ],
})
export class CoreModule {}

docker-compose.yml contains two Postgres containers, one for each connection:

version: "3.8"
services:
  pg-event-store:
    image: postgres
    environment:
      POSTGRES_PASSWORD: ${EVENT_STORE_PASSWORD}
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${EVENT_STORE_USERNAME} -d ${EVENT_STORE_DATABASE}"]
    ports:
      - "${EVENT_STORE_PORT}:5432"

  pg-read-db:
    image: postgres
    environment:
      POSTGRES_PASSWORD: ${READ_DB_PASSWORD}
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${READ_DB_USERNAME} -d ${READ_DB_DATABASE}"]
    ports:
      - "${READ_DB_PORT}:5432"

Example .env:

PORT=3001

EVENT_STORE_HOST=localhost
EVENT_STORE_PORT=5433
EVENT_STORE_USERNAME=postgres
EVENT_STORE_PASSWORD=postgres
EVENT_STORE_DATABASE=event_store

READ_DB_HOST=localhost
READ_DB_PORT=5434
READ_DB_USERNAME=postgres
READ_DB_PASSWORD=postgres
READ_DB_DATABASE=read_db

Why two databases?

From a CQRS and event-sourcing perspective, the event store and read database have different responsibilities:

  • The event store is the source of truth. It must be append-only, ordered, and durable. Reads from it are mostly sequential streams (per aggregate).
  • The read database is optimized for queries: filters, sorts, joins, pagination, projections for UI or APIs.

Even when both run on Postgres in development, separate connections move you closer to a production topology where:

  • the event store might live in a different cluster, or a specialized event database,
  • the read side might be replicated Postgres, Elasticsearch, MongoDB, or a mix of projections.

By separating the connections early, you avoid accidental coupling such as joining the event table directly in a query handler.

Step 3: Creating the generic event store

The event store is defined through an abstraction and implemented on Postgres with TypeORM.

// src/shared/application/ports/event-store.ts
import { EventEntity } from '../../infrastructure/event-store/entities/event.entity';

export abstract class EventStore {
  abstract persist(eventOrEvents: EventEntity | EventEntity[]): Promise<void>;
  abstract getEventsByStreamId(streamId: string): Promise<EventEntity[]>;
}

Why an EventStore port?

This moves the design toward a ports-and-adapters style:

  • The port (EventStore abstract class) expresses what the application needs: append events and load them by stream.
  • The adapter (PgEventStore) decides how to do this: SQL, NoSQL, EventStoreDB, Kafka, or an external service.

In this sample the port is still typed in terms of the persistence entity EventEntity for simplicity, so there is a direct dependency from the abstraction to the storage shape. A stricter design would introduce a domain-level event DTO for the port and map that to specific persistence models in the adapter. Even with this coupling, the rest of the application depends on the EventStore abstraction and can change implementation details in a single place.

The events table stores all domain events:

// src/shared/infrastructure/event-store/entities/event.entity.ts
import {
  Column,
  CreateDateColumn,
  Entity,
  Index,
  PrimaryColumn,
} from 'typeorm';

@Entity('events')
@Index(['streamId', 'position'], { unique: true })
export class EventEntity {
  @PrimaryColumn()
  streamId: string;

  @PrimaryColumn()
  position: number;

  @Column()
  type: string;

  @Column('json')
  data: Record<string, any>;

  @Column('text', { nullable: true })
  userId?: string | null;

  @CreateDateColumn({ type: 'timestamptz' })
  createdAt?: Date;
}

Why this schema?

  • streamId represents the aggregate id (for example Invoice.id).
  • position is the sequence number inside that stream, starting at 1. Together with streamId, it forms a composite primary key and unique index; you never overwrite events.
  • type stores the event class name (InvoiceCreatedEvent, InvoicePaidEvent, etc.).
  • data holds the serialized event object as JSON.
  • userId and createdAt allow you to add context without changing the domain event shape (for auditing, multi-tenancy, or troubleshooting).

Events are append-only. If you need to change behavior, you add new events and adjust handlers. Historical events remain untouched.

The Postgres implementation handles persistence and ordering of events:

// src/shared/infrastructure/event-store/event-store.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventStore } from '../../application/ports/event-store';
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
import { EventEntity } from './entities/event.entity';
import { DataSource, Repository } from 'typeorm';
import { EVENT_STORE_CONNECTION } from '../../../core/core.constants';
import { EventDeserializer } from './deserializers/event.deserializer';

@Injectable()
export class PgEventStore implements EventStore {
  private readonly logger = new Logger(EventStore.name);

  constructor(
    @InjectRepository(EventEntity, EVENT_STORE_CONNECTION)
    private eventStore: Repository<EventEntity>,
    @InjectDataSource(EVENT_STORE_CONNECTION)
    private dataSource: DataSource,
    private readonly eventDeserializer: EventDeserializer,
  ) {}

  async persist(eventOrEvents: EventEntity | EventEntity[]) {
    const events = Array.isArray(eventOrEvents)
      ? eventOrEvents
      : [eventOrEvents];

    const queryRunner = this.dataSource.createQueryRunner();

    await queryRunner.connect();
    await queryRunner.startTransaction();
    try {
      await queryRunner.manager.save(EventEntity, events);

      await queryRunner.commitTransaction();
      this.logger.debug(`Events inserted successfully to the event store`);
    } catch (error) {
      await queryRunner.rollbackTransaction();
      throw error;
    } finally {
      await queryRunner.release();
    }
  }

  async getEventsByStreamId(streamId: string) {
    const events = await this.eventStore.find({
      where: { streamId },
      order: { position: 'ASC' },
    });

    if (events.length === 0) {
      throw new Error(`Aggregate with id ${streamId} does not exist`);
    }

    return events.map((e) => this.eventDeserializer.deserialize(e));
  }
}

Why explicit transactions?

When you persist multiple events from a single command (for example, OrderPlaced + StockReserved), they should either all appear in the stream or none:

  • The stream should not represent a partial change.
  • Using a QueryRunner and explicit transaction guarantees atomicity per call to persist.

This code does not yet implement optimistic concurrency checks (comparing expected position to database state), but it is structured so you can add those checks in one place.

getEventsByStreamId enforces an invariant: if there are no events, the aggregate does not exist. That removes the need to persist “empty” aggregates.

Step 4: Making Nest’s EventBus talk to the event store

Nest’s @nestjs/cqrs module uses an IEventPublisher interface. You can plug in a custom publisher that writes events into Postgres instead of publishing them only in memory.

// src/shared/infrastructure/event-store/publishers/event-store.publisher.ts
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { EventBus, IEvent, IEventPublisher } from '@nestjs/cqrs';
import { EventSerializer } from '../serializers/event.serializer';
import { VersionedAggregateRoot } from '../../../domain/aggregate-root';
import { PgEventStore } from '../event-store';

@Injectable()
export class EventStorePublisher
  implements OnApplicationBootstrap, IEventPublisher
{
  private readonly logger = new Logger(EventStorePublisher.name);

  constructor(
    private readonly eventStore: PgEventStore,
    private readonly eventBus: EventBus,
    private readonly eventSerializer: EventSerializer,
  ) {}

  onApplicationBootstrap() {
    this.eventBus.publisher = this;
  }

  publish<T extends IEvent = IEvent>(
    event: T,
    dispatcher: VersionedAggregateRoot,
  ) {
    this.logger.debug(`Publishing event: ${JSON.stringify(event)}`);

    const serializableEvent = this.eventSerializer.serialize(event, dispatcher);
    return this.eventStore.persist(serializableEvent);
  }

  publishAll<T extends IEvent>(
    events: T[],
    dispatcher: VersionedAggregateRoot,
  ) {
    this.logger.debug(`Publishing events: ${JSON.stringify(events)}`);

    const serializableEvents = events
      .map((event) => this.eventSerializer.serialize(event, dispatcher))
      .map((serializableEvent, index) => ({
        ...serializableEvent,
        position: dispatcher.version.value + index + 1,
      }));

    return this.eventStore.persist(serializableEvents);
  }
}

How this fits into Nest’s aggregate flow

AggregateRoot from @nestjs/cqrs keeps uncommitted events in memory. When you call aggregate.commit():

  1. The publisher receives all uncommitted events.
  2. The publisher persists them to the event store.
  3. The publisher normally dispatches them on the EventBus.

This implementation:

  • intercepts that pipeline by replacing EventBus.publisher with EventStorePublisher,
  • appends events to Postgres, and
  • delegates actual publishing to a TypeORM subscriber (later in Step 6).

The split between writing to the store and notifying handlers gives you control over reliability patterns (outbox, retries, replay, etc.).

EventSerializer maps domain events to EventEntity instances:

// src/shared/infrastructure/event-store/serializers/event.serializer.ts
import { Injectable } from '@nestjs/common';
import { VersionedAggregateRoot } from '../../../domain/aggregate-root';
import { EventEntity } from '../entities/event.entity';

@Injectable()
export class EventSerializer {
  serialize<T extends object>(
    event: T,
    dispatcher: VersionedAggregateRoot,
  ): EventEntity {
    const eventType = event.constructor?.name;

    if (!eventType) {
      throw new Error('Incompatible event type');
    }

    return {
      streamId: dispatcher.id,
      position: dispatcher.version.value + 1,
      type: eventType,
      data: event,
    };
  }
}

Why use the constructor name as type?

The event type string ties storage and code together:

  • When you evolve the system (add handlers, replay events), you need to know which class to instantiate.
  • Using constructor.name keeps this mapping local to the event class.

In environments where class names might be mangled, you can replace this with an explicit static eventType = 'INVOICE_CREATED' property and map that instead.

To get events from the store back into actual class instances, a registry keeps track of all event classes:

// src/shared/infrastructure/event-store/event-cls.registry.ts
import { Type } from '@nestjs/common';

export class EventClsRegistry {
  private static readonly eventClsMap = new Map<string, Type>();

  static add(eventCls: Type): void {
    this.eventClsMap.set(eventCls.name, eventCls);
  }

  static get(eventClsName: string): Type {
    const eventCls = this.eventClsMap.get(eventClsName);

    if (!eventCls) {
      throw new Error(`Event class ${eventClsName} not registered`);
    }

    return eventCls;
  }
}

Event classes register themselves with a decorator:

// src/shared/decorators/autowired-event.decorator.ts
import { EventClsRegistry } from '../infrastructure/event-store/event-cls.registry';

export const AutoWiredEvent: ClassDecorator = (target: any) => {
  EventClsRegistry.add(target);
};

The deserializer uses this registry to restore the prototype chain when reading from the database:

// src/shared/infrastructure/event-store/deserializers/event.deserializer.ts
import { Injectable, Type } from '@nestjs/common';
import { EventEntity } from '../entities/event.entity';
import { EventClsRegistry } from '../event-cls.registry';

@Injectable()
export class EventDeserializer {
  deserialize(e: EventEntity): EventEntity {
    const cls: Type = EventClsRegistry.get(e.type);

    return {
      ...e,
      // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
      data: Object.assign(Object.create(cls.prototype), e.data),
    };
  }
}

Why a registry instead of instanceof checks or plain objects?

When events are loaded from the database, they arrive as plain JSON. Without a prototype, you lose:

  • instance methods on the event class,
  • structural checks like event instanceof InvoicePaidEvent,
  • and the ability to share behavior across related events.

By reconstructing the prototype chain using the registry, you keep events as first-class objects while still storing them as JSON.

The decorator keeps registration near the event definition and avoids manual lists of event classes.

Step 5: Versioned aggregate root and rehydration

The write model relies on a versioned aggregate root that keeps track of the last applied event position.

// src/shared/domain/value-objects/version.ts
export class Version {
  constructor(readonly value: number) {}
}
// src/shared/domain/aggregate-root.ts
import { AggregateRoot } from '@nestjs/cqrs';
import { Version } from './value-objects/version';
import { EventEntity } from '../infrastructure/event-store/entities/event.entity';

const VERSION = Symbol('version');

export class VersionedAggregateRoot extends AggregateRoot {
  public id: string;

  private [VERSION] = new Version(0);

  get version(): Version {
    return this[VERSION];
  }

  set version(version: Version) {
    this[VERSION] = version;
  }

  loadFromHistory(history: EventEntity[]) {
    const domainEvents = history.map((event) => event.data);
    super.loadFromHistory(domainEvents);

    const lastEvent = history[history.length - 1];
    this.version = new Version(lastEvent.position);
  }
}

Why track version on the aggregate?

The aggregate version is important for two reasons:

  1. Correct event positions New events must be appended after the last known position to maintain a strict ordering per stream.
  2. Optimistic concurrency (future extension) You can compare the current version with what is stored to detect conflicting updates without locking rows.

loadFromHistory uses AggregateRoot.loadFromHistory from @nestjs/cqrs, which calls the on<EventName> handlers on the aggregate. That ensures the aggregate’s state is derived from the same events that are stored, not from ad-hoc setters.

In DDD terms, the application layer defines transactional boundaries and coordinates aggregates. Command handlers belong to this layer: they receive intent, load the required aggregates, invoke domain operations, and commit events. They should not know how aggregates are reconstructed internally.

The AggregateRehydrator rebuilds an aggregate from its event stream:

// src/shared/application/aggregate-rehydrator.ts
import { Injectable, Type } from '@nestjs/common';
import { EventStore } from './ports/event-store';
import { EventPublisher } from '@nestjs/cqrs';
import { VersionedAggregateRoot } from '../domain/aggregate-root';

@Injectable()
export class AggregateRehydrator {
  constructor(
    private readonly eventStore: EventStore,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async rehydrate<T extends VersionedAggregateRoot>(
    aggregateId: string,
    AggregateCls: Type<T>,
  ): Promise<T> {
    const events = await this.eventStore.getEventsByStreamId(aggregateId);

    const AggregateClsWithDispatcher =
      this.eventPublisher.mergeClassContext(AggregateCls);
    const aggregate = new AggregateClsWithDispatcher(aggregateId);

    aggregate.loadFromHistory(events);

    return aggregate;
  }
}

Why a dedicated rehydrator?

Rehydration is not domain logic; it is infrastructure:

  • You fetch events from the store.
  • You reconstruct an aggregate instance.
  • You attach publishing capabilities (mergeClassContext).

By putting that in AggregateRehydrator, command handlers stay short: they only ask for “rehydrate this aggregate id” and then call domain methods.

It also centralizes policy: if you later add snapshots, caching, or different rehydration strategies, you update this one place.

SharedModule exposes these building blocks:

// src/shared/infrastructure/shared-infrastructure.module.ts
import { Module } from '@nestjs/common';
import { EVENT_STORE_CONNECTION } from '../../core/core.constants';
import { EventSerializer } from './event-store/serializers/event.serializer';
import { EventStorePublisher } from './event-store/publishers/event-store.publisher';
import { EventStore } from '../application/ports/event-store';
import { TypeOrmModule } from '@nestjs/typeorm';
import { EventEntity } from './event-store/entities/event.entity';
import { EventEntitySubscriber } from './event-store/event-subscriber';
import { PgEventStore } from './event-store/event-store';
import { EventDeserializer } from './event-store/deserializers/event.deserializer';

@Module({
  imports: [TypeOrmModule.forFeature([EventEntity], EVENT_STORE_CONNECTION)],
  providers: [
    EventSerializer,
    EventDeserializer,
    EventStorePublisher,
    PgEventStore,
    EventEntitySubscriber,
    {
      provide: EventStore,
      useExisting: PgEventStore,
    },
  ],
  exports: [EventStore],
})
export class SharedInfrastructureModule {}
// src/shared/shared.module.ts
import { Module } from '@nestjs/common';
import { SharedInfrastructureModule } from './infrastructure/shared-infrastructure.module';
import { AggregateRehydrator } from './application/aggregate-rehydrator';

@Module({
  imports: [SharedInfrastructureModule],
  providers: [AggregateRehydrator],
  exports: [SharedInfrastructureModule, AggregateRehydrator],
})
export class SharedModule {}

Why export EventStore and AggregateRehydrator from a shared module?

This keeps event-sourcing concerns cross-cutting but still explicit:

  • Any bounded context can depend on SharedModule to get rehydration and event persistence.
  • The concrete storage implementation remains hidden in infrastructure.
  • Testing becomes simpler: you can replace EventStore with an in-memory fake in tests.

Step 6: Wiring event handlers from the event store back into Nest

When an event is inserted into the events table, a TypeORM subscriber pushes the domain event into Nest’s EventBus. This way, event handlers run whenever new events are appended.

// src/shared/infrastructure/event-store/event-subscriber.ts
import {
  DataSource,
  EntitySubscriberInterface,
  EventSubscriber,
  InsertEvent,
} from 'typeorm';
import { EventEntity } from './entities/event.entity';
import { EventBus } from '@nestjs/cqrs';
import { Logger } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { EVENT_STORE_CONNECTION } from '../../../core/core.constants';

@EventSubscriber()
export class EventEntitySubscriber
  implements EntitySubscriberInterface<EventEntity>
{
  private readonly logger = new Logger(EventStore.name);

  constructor(
    @InjectDataSource(EVENT_STORE_CONNECTION)
    private readonly dataSource: DataSource,
    private readonly eventBus: EventBus,
  ) {
    this.dataSource.subscribers.push(this);
  }

  listenTo() {
    return EventEntity;
  }

  afterInsert(event: InsertEvent<EventEntity>) {
    this.eventBus.subject$.next(event.entity.data);
  }
}

Why publish from a database subscriber?

This choice separates two concerns:

  • Persistence: the event is stored in the database.
  • Dispatch: handlers are notified when the insert succeeds at the ORM level.

Publishing from afterInsert keeps the write to the event store and the projection dispatch on different axes:

  • The insert into events runs inside a database transaction.
  • The subscriber then pushes the event object into Nest’s EventBus in-process.

This simple setup does not couple projection work to the event-store transaction; projections use their own persistence connection. For strict guarantees that projections only run for committed events across different stores, you still need patterns such as transactional outbox/inbox and idempotent projections. The subscriber keeps the event store as the reference log while leaving room to introduce those patterns later.

The overall flow:

  1. Aggregate applies events in memory.
  2. EventStorePublisher persists them in Postgres.
  3. TypeORM EventEntitySubscriber sees the new row and publishes the event instance to the Nest EventBus.
  4. Event handlers update read models or trigger side effects.


Implementing the Invoice bounded context

With shared infrastructure in place, you can implement a concrete domain: invoices.

From a DDD perspective, the invoices module represents a bounded context: it owns all concepts related to issuing, managing, and paying invoices. Other contexts—such as customers, payments, or accounting—will have their own models and rules, even if they reference the same real-world entities.

Aggregate boundaries should be defined by invariants, not by data grouping. In an invoicing domain, examples of invariants include: an invoice cannot be paid twice; a cancelled invoice cannot be paid; or an invoice must be consistent with its line items. The simplified aggregate in this guide focuses only on the lifecycle necessary to illustrate CQRS and Event Sourcing mechanics.

The invoice domain is deliberately simple:

  • creation: InvoiceCreatedEvent
  • payment: InvoicePaidEvent

This is enough to show aggregate life cycle, projections, and rehydration without additional business rules.

Step 7: Defining invoice domain events

Two events describe the lifecycle of an invoice:

// src/invoices/domain/events/invoice-created.event.ts
import { AutoWiredEvent } from '../../../shared/decorators/autowired-event.decorator';

@AutoWiredEvent
export class InvoiceCreatedEvent {
  constructor(
    public readonly id: string,
    public readonly customerId: string,
    public readonly amount: number,
  ) {}
}
// src/invoices/domain/events/invoice-paid.event.ts
import { AutoWiredEvent } from '../../../shared/decorators/autowired-event.decorator';

@AutoWiredEvent
export class InvoicePaidEvent {
  constructor(public readonly id: string) {}
}

The @AutoWiredEvent decorator ensures these classes are known to the event registry for deserialization.

Why model events first?

In event sourcing, events are a primary API of your domain:

  • Events describe facts that already happened ("InvoiceCreated", not "CreateInvoice").
  • Other systems can subscribe to them without understanding command semantics.
  • They capture the ubiquitous language of the business.

Once events are stable, aggregates and projections become implementation details around those facts.

Step 8: The Invoice aggregate

The aggregate extends VersionedAggregateRoot and implements event handlers using the on<EventName> naming convention from @nestjs/cqrs.

// src/invoices/domain/invoice.ts
import { VersionedAggregateRoot } from '../../shared/domain/aggregate-root';
import { InvoicePaidEvent } from './events/invoice-paid.event';

export class Invoice extends VersionedAggregateRoot {
  public customerId: string;
  public amount: number;
  public paid: boolean;

  constructor(public id: string) {
    super();
  }

  pay() {
    this.apply(new InvoicePaidEvent(this.id));
  }

  [`on${InvoiceCreatedEvent.name}`](event: InvoiceCreatedEvent) {
    this.customerId = event.customerId;
    this.amount = event.amount;
    this.paid = false;
  }

  [`on${InvoicePaidEvent.name}`](event: InvoicePaidEvent) {
    if (this.paid) {
      throw new Error('Invoice already paid');
    }

    this.paid = true;
  }
}

Why keep logic on the aggregate?

The aggregate is the enforcement point for invariants:

  • You cannot pay an invoice twice (if (this.paid) throw).
  • Later you might add rules like “cannot pay a cancelled invoice” or “amount must be positive”.

By applying events inside aggregate methods, you centralize these rules:

  • Command handlers become thin translators from external requests to domain operations.
  • Projections and queries stay read-only and do not enforce invariants.

The on<EventName> handler pattern ensures the same logic is applied both when:

  • events are raised in the current request, and
  • events are replayed from history during rehydration.

A factory encapsulates creation logic and emits the InvoiceCreatedEvent:

// src/invoices/domain/factories/invoice.factory.ts
import { randomUUID } from 'crypto';
import { Injectable } from '@nestjs/common';
import { Invoice } from '../invoice';
import { InvoiceCreatedEvent } from '../events/invoice-created.event';

@Injectable()
export class InvoiceFactory {
  create(customerId: string, amount: number) {
    const id = randomUUID();

    const invoice = new Invoice(id);

    invoice.customerId = customerId;
    invoice.amount = amount;
    invoice.paid = false;

    // skip the handler because we have already assigned the properties.
    invoice.apply(new InvoiceCreatedEvent(id, customerId, amount), {
      skipHandler: true,
    });

    return invoice;
  }
}

Why a factory and skipHandler?

Creation often has different rules from updates:

  • You need a fresh id.
  • You may validate input differently.
  • You want to ensure a matching InvoiceCreatedEvent exists in the stream.

Using a factory:

  • centralizes creation logic, independent of controllers and command handlers,
  • allows you to add cross-cutting behavior later (validation, logging, defaults) without touching handlers, and
  • guarantees that every invoice instance has a corresponding creation event.

skipHandler: true is used because the factory has just assigned the properties. The event is still written to the stream, but state is already in sync.

Step 9: Commands and command handlers (write side)

Commands represent user intent:

// src/invoices/application/commands/impl/create-invoice.command.ts
export class CreateInvoiceCommand {
  constructor(
    public readonly customerId: string,
    public readonly amount: number,
  ) {}
}
// src/invoices/application/commands/impl/pay-invoice.command.ts
export class PayInvoiceCommand {
  constructor(public readonly id: string) {}
}

Why commands instead of calling the service directly?

Commands give you:

  • a clear model for allowed operations (CreateInvoice, PayInvoice, etc.),
  • a place to attach validation and authorization policies,
  • an explicit boundary between outside world and domain.

Commands are messages. They can later be sent from other services, queued, or retried without changing domain logic.

The CreateInvoiceCommandHandler uses the factory and the EventPublisher from @nestjs/cqrs:

// src/invoices/application/commands/handlers/create-invoice.command-handler.ts
import { Logger } from '@nestjs/common';
import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs';
import { CreateInvoiceCommand } from '../impl/create-invoice.command';
import { InvoiceFactory } from '../../../domain/factories/invoice.factory';

@CommandHandler(CreateInvoiceCommand)
export class CreateInvoiceCommandHandler
  implements ICommandHandler<CreateInvoiceCommand>
{
  private readonly logger = new Logger(CreateInvoiceCommandHandler.name);

  constructor(
    private readonly invoiceFactory: InvoiceFactory,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(command: CreateInvoiceCommand) {
    this.logger.debug(
      `Processing "${CreateInvoiceCommand.name}": ${JSON.stringify(command)}`,
    );

    const invoice = this.invoiceFactory.create(
      command.customerId,
      command.amount,
    );

    // Merge the invoice with the event publisher to enable event handling
    this.eventPublisher.mergeObjectContext(invoice);

    invoice.commit();

    return invoice;
  }
}

The PayInvoiceCommandHandler rehydrates the aggregate from the event store, applies the new event, and commits again:

// src/invoices/application/commands/handlers/pay-invoice.command.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { PayInvoiceCommand } from '../impl/pay-invoice.command';
import { Logger } from '@nestjs/common';
import { AggregateRehydrator } from '../../../../shared/application/aggregate-rehydrator';
import { Invoice } from '../../../domain/invoice';

@CommandHandler(PayInvoiceCommand)
export class PayInvoiceCommandHandler
  implements ICommandHandler<PayInvoiceCommand>
{
  private readonly logger = new Logger(PayInvoiceCommandHandler.name);

  constructor(private readonly aggregateRehydrator: AggregateRehydrator) {}

  async execute(command: PayInvoiceCommand) {
    this.logger.debug(
      `Processing "${PayInvoiceCommand.name}": ${JSON.stringify(command)}`,
    );

    const invoice = await this.aggregateRehydrator.rehydrate(
      command.id,
      Invoice,
    );

    invoice.pay();
    invoice.commit();

    return invoice;
  }
}

How command handlers relate to aggregates

Handler responsibilities:

  • Translate incoming commands into domain method calls.
  • Rehydrate aggregates when necessary.
  • Commit changes after domain logic has applied events.

They should not:

  • talk directly to Repository<InvoiceEntity> or any read models,
  • implement business rules that belong to the aggregate,
  • perform side effects that belong in projections or process managers.

This separation keeps handlers easy to read and test: for a given command, you can see which aggregate method is called and what happens next.

Step 10: Read model and projections (read side)

The read model type in this guide is simple and used mainly for documentation:

// src/invoices/domain/read-models/invoice.read-model.ts
import { ApiProperty } from '@nestjs/swagger';

export class InvoiceReadModel {
  @ApiProperty({ example: 'cus_123456789' })
  readonly customerId: string;

  @ApiProperty({ example: 1000, description: "Invoice's amount in cents" })
  readonly amount: number;
}

The read database stores a denormalized representation:

// src/invoices/infrastructure/persistance/entities/invoice.entity.ts
import { Column, Entity, PrimaryColumn } from 'typeorm';

@Entity('invoices')
export class InvoiceEntity {
  @PrimaryColumn()
  id: string;

  @Column()
  customerId: string;

  @Column()
  amount: number;

  @Column({ default: false })
  paid: boolean;
}

At runtime the TypeORM repositories return InvoiceEntity, which includes id and paid. The InvoiceReadModel captures a compact view that is suitable as a documented schema in the API description. In a production system you would usually align the read model type and the persisted projection more closely, or map from entities to explicit DTOs.

Why separate the read model from the aggregate?

Aggregates belong on the write side. They contain the decision state required to enforce business rules and invariants. This state can include fields that are not intended for exposure, such as internal flags or data used solely for validation and consistency checks.

Read models belong on the read side. They store the projected state that the application returns to clients, and their structure is shaped for queries and for safe exposure through the API. This separation prevents accidental leakage of internal or sensitive aggregate data and avoids mixing domain logic with presentation concerns.

In this example, the read model is persisted separately in the invoices table, and the query handlers talk to a FindInvoiceRepository abstraction. For brevity, some write endpoints still return the aggregate instance produced by the command handlers, even though in a stricter CQRS setup clients would rely only on read models or follow-up queries.

By keeping aggregates and read models conceptually independent, the write side stays focused on enforcing rules, while the read side provides controlled, explicit representations of state for external consumers.

Repositories centralize TypeORM usage behind interfaces:

// src/invoices/application/ports/find-invoice.repository.ts
import { InvoiceReadModel } from '../../domain/read-models/invoice.read-model';

export abstract class FindInvoiceRepository {
  abstract findById(id: string): Promise<InvoiceReadModel | null>;
  abstract findAll(): Promise<InvoiceReadModel[]>;
}
// src/invoices/application/ports/upsert-invoice.repository.ts
import { DeepPartial } from 'typeorm';
import { InvoiceEntity } from '../../infrastructure/persistance/entities/invoice.entity';

export abstract class UpsertInvoiceRepository {
  abstract upsert(invoice: DeepPartial<InvoiceEntity>): Promise<void>;
}
// src/invoices/infrastructure/persistance/repositories/find-invoice.repository.ts
import { Injectable } from '@nestjs/common';
import { FindInvoiceRepository } from '../../../application/ports/find-invoice.repository';
import { InjectRepository } from '@nestjs/typeorm';
import { InvoiceEntity } from '../entities/invoice.entity';
import { Repository } from 'typeorm';

@Injectable()
export class OrmFindInvoiceRepository implements FindInvoiceRepository {
  constructor(
    @InjectRepository(InvoiceEntity)
    private readonly repository: Repository<InvoiceEntity>,
  ) {}

  async findById(id: string) {
    return this.repository.findOneBy({ id });
  }

  async findAll() {
    return this.repository.find();
  }
}
// src/invoices/infrastructure/persistance/repositories/upsert-invoice.repository.ts
import { Injectable } from '@nestjs/common';
import { UpsertInvoiceRepository } from '../../../application/ports/upsert-invoice.repository';
import { InjectRepository } from '@nestjs/typeorm';
import { InvoiceEntity } from '../entities/invoice.entity';
import { DeepPartial, Repository } from 'typeorm';

@Injectable()
export class OrmUpsertInvoiceRepository implements UpsertInvoiceRepository {
  constructor(
    @InjectRepository(InvoiceEntity)
    private readonly repository: Repository<InvoiceEntity>,
  ) {}

  async upsert(invoice: DeepPartial<InvoiceEntity>) {
    await this.repository.save(invoice);
  }
}

A small module wires the repositories:

// src/invoices/infrastructure/persistance/persistence.module.ts
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { InvoiceEntity } from './entities/invoice.entity';
import { UpsertInvoiceRepository } from '../../application/ports/upsert-invoice.repository';
import { OrmUpsertInvoiceRepository } from './repositories/upsert-invoice.repository';
import { FindInvoiceRepository } from '../../application/ports/find-invoice.repository';
import { OrmFindInvoiceRepository } from './repositories/find-invoice.repository';

@Module({
  imports: [TypeOrmModule.forFeature([InvoiceEntity])],
  providers: [
    {
      provide: UpsertInvoiceRepository,
      useClass: OrmUpsertInvoiceRepository,
    },
    {
      provide: FindInvoiceRepository,
      useClass: OrmFindInvoiceRepository,
    },
  ],
  exports: [UpsertInvoiceRepository, FindInvoiceRepository],
})
export class InvoicePersistenceModule {}

Why ports on the read side as well?

Even though the read model uses a simple table now, you might later:

  • move invoices into a reporting database,
  • denormalize into multiple views,
  • or query a search index.

By depending on FindInvoiceRepository and UpsertInvoiceRepository instead of Repository<InvoiceEntity>, the application layer knows only about these abstractions. In this sample the interfaces still use InvoiceEntity and DeepPartial<InvoiceEntity> for convenience, but the indirection gives you a single place to adapt the persistence layer if it changes.

Read-model projections run as event handlers.

For invoice creation:

// src/invoices/application/event-handlers/invoice-created.event-handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { InvoiceCreatedEvent } from '../../domain/events/invoice-created.event';
import { Logger } from '@nestjs/common';
import { UpsertInvoiceRepository } from '../ports/upsert-invoice.repository';

@EventsHandler(InvoiceCreatedEvent)
export class InvoiceCreatedEventHandler
  implements IEventHandler<InvoiceCreatedEvent>
{
  private readonly logger = new Logger(InvoiceCreatedEventHandler.name);

  constructor(private readonly repository: UpsertInvoiceRepository) {}

  async handle(event: InvoiceCreatedEvent) {
    this.logger.debug(`${InvoiceCreatedEvent.name}: ${JSON.stringify(event)}`);

    // In a real-world application, this should be part of a reliable pattern
    // such as transactional outbox/inbox.
    await this.repository.upsert({
      id: event.id,
      customerId: event.customerId,
      amount: event.amount,
    });
  }
}

For invoice payment:

// src/invoices/application/event-handlers/invoice-paid.event-handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { InvoicePaidEvent } from '../../domain/events/invoice-paid.event';
import { Logger } from '@nestjs/common';
import { UpsertInvoiceRepository } from '../ports/upsert-invoice.repository';

@EventsHandler(InvoicePaidEvent)
export class InvoicePaidEventHandler
  implements IEventHandler<InvoicePaidEvent>
{
  private readonly logger = new Logger(InvoicePaidEventHandler.name);

  constructor(private readonly repository: UpsertInvoiceRepository) {}

  async handle(event: InvoicePaidEvent) {
    await this.repository.upsert({
      id: event.id,
      paid: true,
    });
  }
}

Why use event handlers for projections?

Event handlers are projections from the write model to the read model:

  • They translate domain events into table updates.
  • They can maintain multiple projections from the same stream (invoices_by_customer, invoice_statistics, etc.).
  • They run once per event and can be written to be idempotent.

In production, you would combine this with:

  • an outbox/inbox pattern to guarantee at-least-once delivery, and
  • idempotency keys or natural keys (id) to avoid duplicate inserts.

Because projections run in response to events, the read side is eventually consistent with the event store. After a command completes, there may be a short delay before the corresponding read model reflects the change. This delay is expected in systems built around CQRS and Event Sourcing and should be acknowledged when designing user interactions and workflows.

Step 11: Queries and query handlers

Queries model read operations:

// src/invoices/application/queries/impl/find-all-invoices.query.ts
export class FindAllInvoicesQuery {}
// src/invoices/application/queries/impl/find-invoice-by-id.query.ts
export class FindInvoiceByIdQuery {
  constructor(public readonly id: string) {}
}

Handlers delegate to the read repository:

// src/invoices/application/queries/handlers/find-all-invoices.query-handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { FindAllInvoicesQuery } from '../impl/find-all-invoices.query';
import { InvoiceReadModel } from '../../../domain/read-models/invoice.read-model';
import { FindInvoiceRepository } from '../../ports/find-invoice.repository';

@QueryHandler(FindAllInvoicesQuery)
export class FindAllInvoicesQueryHandler
  implements IQueryHandler<FindAllInvoicesQuery, InvoiceReadModel[]>
{
  constructor(private readonly repository: FindInvoiceRepository) {}

  async execute(query: FindAllInvoicesQuery) {
    return this.repository.findAll();
  }
}
// src/invoices/application/queries/handlers/find-invoice-by-id.query-handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { FindInvoiceByIdQuery } from '../impl/find-invoice-by-id.query';
import { InvoiceReadModel } from '../../../domain/read-models/invoice.read-model';
import { FindInvoiceRepository } from '../../ports/find-invoice.repository';

@QueryHandler(FindInvoiceByIdQuery)
export class FindInvoiceByIdQueryHandler
  implements IQueryHandler<FindInvoiceByIdQuery, InvoiceReadModel | null>
{
  constructor(private readonly repository: FindInvoiceRepository) {}

  async execute(query: FindInvoiceByIdQuery) {
    return this.repository.findById(query.id);
  }
}

Why a QueryBus at all?

For simple applications, controllers can call read repositories directly. In CQRS, using a dedicated QueryBus gives you:

  • one place to apply cross-cutting concerns like caching or logging,
  • symmetry with command handling,
  • flexibility to evolve queries into separate services later.

For now, having query handlers keeps the shape of the application consistent: all reads and writes become explicit conceptual operations.

Step 12: Application service and HTTP API

The application layer exposes use cases to external callers. It pulls together commands, queries, aggregates, and projections without exposing domain internals. In DDD terminology, this layer acts as the orchestrator: it ensures that writes are performed through aggregates and that reads are performed through the query side, keeping the boundary between the two explicit.

The InvoicesService orchestrates commands and queries through CommandBus and QueryBus:

// src/invoices/application/invoices.service.ts
import { Injectable } from '@nestjs/common';
import { CreateInvoiceDto } from '../presentation/http/dto/create-invoice.dto';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { CreateInvoiceCommand } from './commands/impl/create-invoice.command';
import { FindInvoiceByIdQuery } from './queries/impl/find-invoice-by-id.query';
import { FindAllInvoicesQuery } from './queries/impl/find-all-invoices.query';
import { PayInvoiceCommand } from './commands/impl/pay-invoice.command';

@Injectable()
export class InvoicesService {
  constructor(
    private readonly commandBus: CommandBus,
    private readonly queryBus: QueryBus,
  ) {}

  create(dto: CreateInvoiceDto) {
    return this.commandBus.execute(
      new CreateInvoiceCommand(dto.customerId, dto.amount),
    );
  }

  findAll() {
    return this.queryBus.execute(new FindAllInvoicesQuery());
  }

  findOne(id: string) {
    return this.queryBus.execute(new FindInvoiceByIdQuery(id));
  }

  pay(id: string) {
    return this.commandBus.execute(new PayInvoiceCommand(id));
  }
}

Why keep a service on top of the buses?

The service shields controllers from CQRS details:

  • Controllers depend on a single InvoicesService abstraction.
  • If you later move away from @nestjs/cqrs or change transport, the controller layer stays stable.
  • The service can also orchestrate multiple commands or queries into a single use case.

DTOs describe the request payloads and response shapes:

// src/invoices/presentation/http/dto/create-invoice.dto.ts
import { ApiProperty } from '@nestjs/swagger';

export class CreateInvoiceDto {
  @ApiProperty({ example: 'cus_123456789' })
  readonly customerId: string;

  @ApiProperty({ example: 1000, description: "Invoice's amount in cents" })
  readonly amount: number;
}
// src/invoices/presentation/http/dto/update-invoice.dto.ts
import { PartialType } from '@nestjs/mapped-types';
import { CreateInvoiceDto } from './create-invoice.dto';

export class UpdateInvoiceDto extends PartialType(CreateInvoiceDto) {}

The controller delegates to the application service instead of interacting with command or query buses directly. This keeps HTTP concerns isolated and avoids coupling the transport layer to the internal command/query abstractions. As the system evolves to support other transports or workflows, the application service remains the consistent entry point for external operations.

The HTTP controller exposes the API:

// src/invoices/presentation/http/invoices.controller.ts
import { Body, Controller, Get, Param, Patch, Post } from '@nestjs/common';
import { InvoicesService } from '../../application/invoices.service';
import { CreateInvoiceDto } from './dto/create-invoice.dto';
import {
  ApiCreatedResponse,
  ApiOkResponse,
  ApiOperation,
} from '@nestjs/swagger';
import { InvoiceReadModel } from '../../domain/read-models/invoice.read-model';

@Controller('invoices')
export class InvoicesController {
  constructor(private readonly invoicesService: InvoicesService) {}

  @ApiOperation({ operationId: 'create_invoice' })
  @ApiCreatedResponse({ type: InvoiceReadModel })
  @Post()
  create(@Body() createInvoiceDto: CreateInvoiceDto) {
    return this.invoicesService.create(createInvoiceDto);
  }

  @ApiOperation({ operationId: 'find_all_invoice' })
  @ApiOkResponse({ type: [InvoiceReadModel] })
  @Get()
  findAll() {
    return this.invoicesService.findAll();
  }

  @ApiOperation({ operationId: 'find_by_id_invoice' })
  @ApiOkResponse({ type: InvoiceReadModel })
  @Get(':id')
  findOne(@Param('id') id: string) {
    return this.invoicesService.findOne(id);
  }

  @ApiOperation({ operationId: 'pay_invoice' })
  @ApiOkResponse({ type: InvoiceReadModel })
  @Patch(':id/pay')
  pay(@Param('id') id: string) {
    return this.invoicesService.pay(id);
  }
}

In this example, the create and pay endpoints return the aggregate instance produced by the command handlers. The Swagger decorators still refer to InvoiceReadModel so that the documented schema stays compact and focused on the fields that matter for clients. In a stricter CQRS setup you might instead:

  • return only a read model (by issuing a query after the command), or
  • return an identifier or 202 Accepted and let clients fetch state from the query side.

The invoice module combines everything:

// src/invoices/application/invoices.module.ts
import { Module } from '@nestjs/common';
import { InvoicesService } from './invoices.service';
import { InvoicesController } from '../presentation/http/invoices.controller';
import { CreateInvoiceCommandHandler } from './commands/handlers/create-invoice.command-handler';
import { InvoiceInfrastructureModule } from '../infrastructure/invoice-infrastructure.module';
import { InvoiceFactory } from '../domain/factories/invoice.factory';
import { InvoiceCreatedEventHandler } from './event-handlers/invoice-created.event-handler';
import { FindInvoiceByIdQueryHandler } from './queries/handlers/find-invoice-by-id.query-handler';
import { FindAllInvoicesQueryHandler } from './queries/handlers/find-all-invoices.query-handler';
import { PayInvoiceCommandHandler } from './commands/handlers/pay-invoice.command';

@Module({
  imports: [InvoiceInfrastructureModule],
  controllers: [InvoicesController],
  providers: [
    InvoicesService,

    // Factories
    InvoiceFactory,

    // Command Handlers
    CreateInvoiceCommandHandler,
    PayInvoiceCommandHandler,

    // Query Handlers
    FindInvoiceByIdQueryHandler,
    FindAllInvoicesQueryHandler,

    // Event Handlers
    InvoiceCreatedEventHandler,
  ],
})
export class InvoicesModule {}

InvoiceInfrastructureModule pulls in shared infrastructure and persistence:

// src/invoices/infrastructure/invoice-infrastructure.module.ts
import { Module } from '@nestjs/common';
import { InvoicePersistenceModule } from './persistance/persistence.module';
import { SharedModule } from '../../shared/shared.module';

@Module({
  imports: [SharedModule, InvoicePersistenceModule],
  exports: [SharedModule, InvoicePersistenceModule],
})
export class InvoiceInfrastructureModule {}

Why this module wiring?

InvoicesModule becomes the entry point for the bounded context:

  • All command, query, and event handlers are declared here.
  • All infrastructure dependencies are imported via InvoiceInfrastructureModule.
  • All external adapters (HTTP controllers) are declared here.

You can test this module in isolation with a test AppModule that imports only InvoicesModule.


Step 13: Running and testing the flow

  1. Start the databases:
docker compose up -d
  1. Start the Nest application:
pnpm start:dev
  1. Open the Swagger UI at http://localhost:3001/api.

  2. Create an invoice:

curl -X POST http://localhost:3001/invoices \
  -H "Content-Type: application/json" \
  -d '{"customerId":"cus_123456789","amount":1000}'

This:

  • Creates an Invoice aggregate.
  • Emits InvoiceCreatedEvent.
  • Persists the event into the events table.
  • Projects the read model into the invoices table.
  1. List invoices:
curl http://localhost:3001/invoices
  1. Pay an invoice:
curl -X PATCH http://localhost:3001/invoices/<invoiceId>/pay

This:

  • Rehydrates the aggregate from its event stream.
  • Applies InvoicePaidEvent.
  • Appends the event to the store.
  • Updates the paid flag in the read model through InvoicePaidEventHandler.

You can inspect the events and invoices tables in Postgres to see the separation between event log and read model:

  • events tells what happened and in which order.
  • invoices tells what the world looks like now for API consumers.


Where to go next

This example focuses on the core mechanics of CQRS and Event Sourcing in a single NestJS service. For production systems, you would extend this design with:

  • Concurrency control (optimistic concurrency/version checks). Compare the aggregate version you loaded with the version in the database during persist. If they differ, another command updated the same aggregate concurrently and you can reject or retry the operation.

  • Snapshots for long event streams. When streams grow long, rehydration cost can increase. A snapshot stores the aggregate state at a given version so you only apply events after that point.

  • Outbox/inbox patterns for cross-service messaging. Instead of publishing domain events directly to a message broker, store them in an outbox table and let a background process deliver them. Consumers maintain an inbox table to track which events they have processed.

  • Retry and idempotency patterns for projections. Projections should tolerate duplicate deliveries and temporary failures. Using natural keys (like id on InvoiceEntity) and upserts helps here.

  • Additional bounded contexts and integration events. When you add more domains (payments, customers, orders), let them communicate via events instead of direct synchronous calls. Each context maintains its own aggregates and projections.

The repository already gives you a concrete reference implementation you can extend into your own domain.


View the repository on GitHub