Skip to content
memghost.com Open App

Event Sourcing & CQRS

MemGhost uses Event Sourcing and CQRS (Command Query Responsibility Segregation) as core architectural patterns. All state changes are stored as immutable events, and read models are built asynchronously from these events.

Event Sourcing

Event Sourcing stores all changes to application state as a sequence of events. Instead of storing the current state, the system stores the history of what happened. The current state is reconstructed by replaying events.

Benefits

  1. Complete Audit Trail — every change is recorded with timestamp and metadata
  2. Time-Travel Debugging — replay events to any point in time
  3. Multiple Read Models — build different views from the same event stream
  4. Event Replay — rebuild read models from scratch if needed
  5. Natural Plugin Integration — plugins can consume events for their own purposes

Event Store Schema

CREATE TABLE events (
event_id UUID PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_version INT NOT NULL,
correlation_id UUID NOT NULL,
causation_id UUID NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, aggregate_version)
);

The unique constraint on (aggregate_id, aggregate_version) provides optimistic concurrency — two concurrent writes to the same aggregate will conflict, and only one will succeed.

Event Envelope Format

All events follow a standard envelope:

type Event struct {
EventID uuid.UUID `json:"event_id"`
EventType string `json:"event_type"` // e.g., "item.created.v1"
AggregateID uuid.UUID `json:"aggregate_id"`
AggregateType string `json:"aggregate_type"` // e.g., "vault_item"
Version int `json:"aggregate_version"`
CorrelationID uuid.UUID `json:"correlation_id"` // Traces entire workflow
CausationID uuid.UUID `json:"causation_id"` // Direct cause
Data json.RawMessage `json:"data"`
Metadata map[string]string `json:"metadata"`
OccurredAt time.Time `json:"occurred_at"`
}

Event Naming Convention

Format: <aggregate>.<action>.<version>

Examples:

  • item.created.v1
  • item.classified.v1
  • item.archived.v1
  • hub.node_created.v1
  • note.pinned.v1

Versioning Rules:

  • Versions are immutable once published
  • New versions can only add fields (never remove or change)
  • Old versions remain supported indefinitely
  • Breaking changes require a new event type

CQRS Pattern

CQRS separates read and write operations into distinct paths.

Command Flow (Writes)

User Request (HTTP POST/PUT/DELETE)
|
+-> JWT Validation
+-> Authorization Check
+-> Command Handler
| +-> Load Aggregate (from events)
| +-> Execute Business Logic
| +-> Generate Events
| +-> Save Events (transactional)
+-> Publish Events to Bus
+-> Return 202 Accepted

Commands return 202 Accepted because event processing is asynchronous. The Location header contains the resource URL.

Query Flow (Reads)

User Request (HTTP GET)
|
+-> JWT Validation
+-> Authorization Check
+-> Query Handler
| +-> Read from optimized read model
+-> Return 200 OK with data

Queries return 200 OK synchronously. They read from optimized read models with no business logic — just data retrieval.

Projections (Async)

Projections subscribe to events and build optimized read models:

Event Bus
|
+-> Note View Projection
+-> Hub Routing Projection
+-> Embedding Projection
+-> SSE Bridge

Each projection:

  • Subscribes to relevant event types
  • Maintains its own read model table
  • Is optimized for specific query patterns
  • Can be rebuilt entirely from the event store

Aggregate Pattern

An aggregate is a cluster of domain objects treated as a single unit. It has:

  • Aggregate Root — the main entity that controls access
  • Invariants — business rules that must always hold true
  • Events — all state changes are recorded as events

Example: VaultItem Aggregate

type VaultItem struct {
ID uuid.UUID
Version int
Type string
Status ItemStatus
Title string
Content string
Tags []string
Folder string
changes []events.Event
}
func (v *VaultItem) Archive() error {
if v.Status == StatusArchived {
return errors.New("item already archived")
}
v.Status = StatusArchived
v.Version++
v.recordChange(events.ItemArchived{
ItemID: v.ID,
})
return nil
}

Aggregates are loaded by replaying events from the event store:

func (r *Repository) Load(ctx context.Context, id uuid.UUID) (*VaultItem, error) {
events := r.eventStore.GetEvents(ctx, id)
item := &VaultItem{ID: id}
for _, event := range events {
item.Apply(event)
}
return item, nil
}

Snapshots

For large aggregates (e.g., an item with 100+ edits), snapshots improve performance:

  • Snapshots are taken every N events (e.g., every 50)
  • Loading an aggregate reads the snapshot plus only subsequent events
  • Significantly faster rehydration for frequently-modified aggregates

Event Bus

In-Process Event Bus

The event bus routes events within the monolith to projection builders and other subscribers.

SSE Bridge

For external plugins, events are bridged to Server-Sent Events:

In-Process Event Bus
|
+-> Projection Builders (internal)
+-> SSE Bridge
+-> External Plugins (via HTTP SSE)

Best Practices

Event Design

  • Name events after what happened, not what to do
  • Include all necessary data — events should be self-contained
  • Never change existing event versions
  • Use correlation IDs to track workflows across aggregates

Aggregate Design

  • Keep aggregates small — large aggregates are slow to load
  • Enforce invariants — business rules must be checked in the aggregate
  • Generate events for all state changes
  • Use snapshots for frequently-modified aggregates

Projection Design

  • Optimize read models for specific query patterns
  • Handle failures gracefully — projections should be idempotent
  • Always be able to rebuild from the event store
  • Subscribe only to needed event types