Event Sourcing & CQRS
MemGhost uses Event Sourcing and CQRS (Command Query Responsibility Segregation) as core architectural patterns. All state changes are stored as immutable events, and read models are built asynchronously from these events.
Event Sourcing
Event Sourcing stores all changes to application state as a sequence of events. Instead of storing the current state, the system stores the history of what happened. The current state is reconstructed by replaying events.
Benefits
- Complete Audit Trail — every change is recorded with timestamp and metadata
- Time-Travel Debugging — replay events to any point in time
- Multiple Read Models — build different views from the same event stream
- Event Replay — rebuild read models from scratch if needed
- Natural Plugin Integration — plugins can consume events for their own purposes
Event Store Schema
CREATE TABLE events ( event_id UUID PRIMARY KEY, event_type VARCHAR(255) NOT NULL, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(100) NOT NULL, aggregate_version INT NOT NULL, correlation_id UUID NOT NULL, causation_id UUID NOT NULL, data JSONB NOT NULL, metadata JSONB, occurred_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, aggregate_version));The unique constraint on (aggregate_id, aggregate_version) provides optimistic concurrency — two concurrent writes to the same aggregate will conflict, and only one will succeed.
Event Envelope Format
All events follow a standard envelope:
type Event struct { EventID uuid.UUID `json:"event_id"` EventType string `json:"event_type"` // e.g., "item.created.v1" AggregateID uuid.UUID `json:"aggregate_id"` AggregateType string `json:"aggregate_type"` // e.g., "vault_item" Version int `json:"aggregate_version"` CorrelationID uuid.UUID `json:"correlation_id"` // Traces entire workflow CausationID uuid.UUID `json:"causation_id"` // Direct cause Data json.RawMessage `json:"data"` Metadata map[string]string `json:"metadata"` OccurredAt time.Time `json:"occurred_at"`}Event Naming Convention
Format: <aggregate>.<action>.<version>
Examples:
item.created.v1item.classified.v1item.archived.v1hub.node_created.v1note.pinned.v1
Versioning Rules:
- Versions are immutable once published
- New versions can only add fields (never remove or change)
- Old versions remain supported indefinitely
- Breaking changes require a new event type
CQRS Pattern
CQRS separates read and write operations into distinct paths.
Command Flow (Writes)
User Request (HTTP POST/PUT/DELETE) | +-> JWT Validation +-> Authorization Check +-> Command Handler | +-> Load Aggregate (from events) | +-> Execute Business Logic | +-> Generate Events | +-> Save Events (transactional) +-> Publish Events to Bus +-> Return 202 AcceptedCommands return 202 Accepted because event processing is asynchronous. The Location header contains the resource URL.
Query Flow (Reads)
User Request (HTTP GET) | +-> JWT Validation +-> Authorization Check +-> Query Handler | +-> Read from optimized read model +-> Return 200 OK with dataQueries return 200 OK synchronously. They read from optimized read models with no business logic — just data retrieval.
Projections (Async)
Projections subscribe to events and build optimized read models:
Event Bus | +-> Note View Projection +-> Hub Routing Projection +-> Embedding Projection +-> SSE BridgeEach projection:
- Subscribes to relevant event types
- Maintains its own read model table
- Is optimized for specific query patterns
- Can be rebuilt entirely from the event store
Aggregate Pattern
An aggregate is a cluster of domain objects treated as a single unit. It has:
- Aggregate Root — the main entity that controls access
- Invariants — business rules that must always hold true
- Events — all state changes are recorded as events
Example: VaultItem Aggregate
type VaultItem struct { ID uuid.UUID Version int Type string Status ItemStatus Title string Content string Tags []string Folder string changes []events.Event}
func (v *VaultItem) Archive() error { if v.Status == StatusArchived { return errors.New("item already archived") } v.Status = StatusArchived v.Version++ v.recordChange(events.ItemArchived{ ItemID: v.ID, }) return nil}Aggregates are loaded by replaying events from the event store:
func (r *Repository) Load(ctx context.Context, id uuid.UUID) (*VaultItem, error) { events := r.eventStore.GetEvents(ctx, id) item := &VaultItem{ID: id} for _, event := range events { item.Apply(event) } return item, nil}Snapshots
For large aggregates (e.g., an item with 100+ edits), snapshots improve performance:
- Snapshots are taken every N events (e.g., every 50)
- Loading an aggregate reads the snapshot plus only subsequent events
- Significantly faster rehydration for frequently-modified aggregates
Event Bus
In-Process Event Bus
The event bus routes events within the monolith to projection builders and other subscribers.
SSE Bridge
For external plugins, events are bridged to Server-Sent Events:
In-Process Event Bus | +-> Projection Builders (internal) +-> SSE Bridge +-> External Plugins (via HTTP SSE)Best Practices
Event Design
- Name events after what happened, not what to do
- Include all necessary data — events should be self-contained
- Never change existing event versions
- Use correlation IDs to track workflows across aggregates
Aggregate Design
- Keep aggregates small — large aggregates are slow to load
- Enforce invariants — business rules must be checked in the aggregate
- Generate events for all state changes
- Use snapshots for frequently-modified aggregates
Projection Design
- Optimize read models for specific query patterns
- Handle failures gracefully — projections should be idempotent
- Always be able to rebuild from the event store
- Subscribe only to needed event types