Design patterns for asynchronous, event-driven integration.
SEAâ„¢ embraces event-driven architecture for loose coupling and scalability:
| Pattern | Use Case |
|---|---|
| Domain Events | State changes within bounded contexts |
| Integration Events | Cross-context communication |
| Event Sourcing | Audit-critical aggregates |
| CQRS | Read/write separation |
Internal to a bounded context, not published externally.
1
2
3
4
5
6
7
8
9
10
interface CaseCreated {
type: 'domain.case.created';
aggregateId: string;
occurredAt: Date;
data: {
title: string;
priority: Priority;
createdBy: UserId;
};
}
Published via JetStream for cross-context consumption.
1
2
3
4
5
6
7
8
9
10
11
interface CaseClosed {
type: 'sea.event.case_closed.v1';
correlationId: string;
causationId?: string;
occurredAt: Date;
data: {
caseId: string;
resolution: string;
closedBy: UserId;
};
}
1
2
3
4
5
6
7
8
9
10
11
12
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"required": ["type", "occurredAt", "data"],
"properties": {
"type": { "type": "string", "pattern": "^[a-z]+\\.[a-z]+\\.[a-z_]+\\.v[0-9]+$" },
"correlationId": { "type": "string", "format": "uuid" },
"causationId": { "type": "string", "format": "uuid" },
"occurredAt": { "type": "string", "format": "date-time" },
"data": { "type": "object" }
}
}
| Version Change | Approach |
|---|---|
| Add field | Backward compatible, same version |
| Make field optional | Backward compatible, same version |
| Remove field | New version (v2) |
| Change field type | New version (v2) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
User creates case
└── CaseCreated (v1)
├── → Analytics: Update metrics
├── → Notifications: Send email
└── → Audit: Log action
User completes task
└── TaskCompleted (v1)
├── → Case: Update progress
└── → Gamification: Award points
All tasks complete
└── CaseClosed (v1)
├── → Billing: Invoice
└── → Archive: Store documents
Each bounded context maintains an event catalog:
1
2
3
4
5
6
7
8
9
10
11
12
13
# events/case-management.yaml
events:
case_created:
version: 1
schema: ./schemas/case_created.v1.json
producers: [case-service]
consumers: [analytics, notifications, audit]
case_closed:
version: 1
schema: ./schemas/case_closed.v1.json
producers: [case-service]
consumers: [billing, archive, analytics]
For distributed transactions spanning multiple services:
1
2
3
4
5
OrderCreated → InventoryReserved → PaymentProcessed → OrderConfirmed
↓ ↓ ↓
OrderFailed ReserveFailed PaymentFailed
↓ ↓ ↓
Compensate Compensate Compensate
1
2
3
4
5
6
7
@event_handler("order.created.v1")
async def reserve_inventory(event: OrderCreated):
try:
await inventory.reserve(event.items)
await publish(InventoryReserved(event.order_id))
except InsufficientStock:
await publish(ReserveFailed(event.order_id, "insufficient_stock"))
| Scenario | Approach |
|---|---|
| Bug fix | Replay from timestamp |
| New consumer | Replay entire stream |
| Recovery | Replay from checkpoint |
1
2
3
4
5
6
7
8
# Replay from specific sequence
nats consumer info SEA_EVENTS my_consumer
nats consumer update SEA_EVENTS my_consumer --deliver-last
# Create replay consumer
nats consumer add SEA_EVENTS replay__fix_123 \
--deliver-all \
--filter "sea.event.case_created.v1"