🔄 Event-Driven Architecture

Design patterns for asynchronous, event-driven integration.


Overview

SEAâ„¢ embraces event-driven architecture for loose coupling and scalability:

Pattern Use Case
Domain Events State changes within bounded contexts
Integration Events Cross-context communication
Event Sourcing Audit-critical aggregates
CQRS Read/write separation

Event Types

Domain Events

Internal to a bounded context, not published externally.

1
2
3
4
5
6
7
8
9
10
interface CaseCreated {
  type: 'domain.case.created';
  aggregateId: string;
  occurredAt: Date;
  data: {
    title: string;
    priority: Priority;
    createdBy: UserId;
  };
}

Integration Events

Published via JetStream for cross-context consumption.

1
2
3
4
5
6
7
8
9
10
11
interface CaseClosed {
  type: 'sea.event.case_closed.v1';
  correlationId: string;
  causationId?: string;
  occurredAt: Date;
  data: {
    caseId: string;
    resolution: string;
    closedBy: UserId;
  };
}

Event Schema

Structure

1
2
3
4
5
6
7
8
9
10
11
12
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "type": "object",
  "required": ["type", "occurredAt", "data"],
  "properties": {
    "type": { "type": "string", "pattern": "^[a-z]+\\.[a-z]+\\.[a-z_]+\\.v[0-9]+$" },
    "correlationId": { "type": "string", "format": "uuid" },
    "causationId": { "type": "string", "format": "uuid" },
    "occurredAt": { "type": "string", "format": "date-time" },
    "data": { "type": "object" }
  }
}

Versioning Strategy

Version Change Approach
Add field Backward compatible, same version
Make field optional Backward compatible, same version
Remove field New version (v2)
Change field type New version (v2)

Event Choreography

Event Chain Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
User creates case
    └── CaseCreated (v1)
         ├── → Analytics: Update metrics
         ├── → Notifications: Send email
         └── → Audit: Log action

User completes task
    └── TaskCompleted (v1)
         ├── → Case: Update progress
         └── → Gamification: Award points

All tasks complete
    └── CaseClosed (v1)
         ├── → Billing: Invoice
         └── → Archive: Store documents

Event Catalog

Registration

Each bounded context maintains an event catalog:

1
2
3
4
5
6
7
8
9
10
11
12
13
# events/case-management.yaml
events:
  case_created:
    version: 1
    schema: ./schemas/case_created.v1.json
    producers: [case-service]
    consumers: [analytics, notifications, audit]
    
  case_closed:
    version: 1
    schema: ./schemas/case_closed.v1.json
    producers: [case-service]
    consumers: [billing, archive, analytics]

Saga Pattern

For distributed transactions spanning multiple services:

Choreography Saga

1
2
3
4
5
OrderCreated → InventoryReserved → PaymentProcessed → OrderConfirmed
       ↓              ↓                    ↓
   OrderFailed   ReserveFailed      PaymentFailed
       ↓              ↓                    ↓
   Compensate    Compensate           Compensate

Implementation

1
2
3
4
5
6
7
@event_handler("order.created.v1")
async def reserve_inventory(event: OrderCreated):
    try:
        await inventory.reserve(event.items)
        await publish(InventoryReserved(event.order_id))
    except InsufficientStock:
        await publish(ReserveFailed(event.order_id, "insufficient_stock"))

Event Replay

Use Cases

Scenario Approach
Bug fix Replay from timestamp
New consumer Replay entire stream
Recovery Replay from checkpoint

Commands

1
2
3
4
5
6
7
8
# Replay from specific sequence
nats consumer info SEA_EVENTS my_consumer
nats consumer update SEA_EVENTS my_consumer --deliver-last

# Create replay consumer
nats consumer add SEA_EVENTS replay__fix_123 \
  --deliver-all \
  --filter "sea.event.case_created.v1"