Persistent, ordered, at-least-once event delivery.
SEAâ„¢ uses NATS JetStream for all asynchronous communication per ADR-032:
| Feature | Benefit |
|---|---|
| Persistence | Events survive restarts |
| Ordering | Per-subject message ordering |
| Replay | Historical event access |
| Exactly-once | Deduplication via message IDs |
| Scaling | Horizontal consumer scaling |
1
2
3
4
5
6
7
8
name: SEA_EVENTS
subjects:
- "sea.event.>"
retention: limits
max_age: 7d
max_bytes: 10GB
storage: file
replicas: 3 # production
1
2
3
4
5
6
{bounded_context}.event.{event_name}.v{version}
Examples:
sea.event.case_created.v1
sea.event.task_completed.v1
vibespro.event.vibe_generated.v1
1
2
3
4
5
6
7
stream: SEA_EVENTS
durable: {service}__{purpose}
filter_subject: "sea.event.>"
ack_policy: explicit
ack_wait: 120s
max_deliver: 20
max_ack_pending: 50
1
2
3
4
5
{consuming_service}__{from_context}
Examples:
case_mgmt__from_sea
analytics__from_vibespro
| Header | Purpose |
|---|---|
Nats-Msg-Id |
Deduplication (= outbox event ID) |
X-Correlation-Id |
Request tracing |
X-Causation-Id |
Event chain tracking |
X-Event-Version |
Schema version |
1
2
3
4
5
6
7
8
9
10
11
# Events are written to the outbox, not published directly
async def create_case(cmd: CreateCase) -> Case:
case = Case.create(cmd)
# Commit case AND outbox event atomically
await db.transaction([
cases.insert(case),
outbox.insert(CaseCreatedEvent(case)),
])
return case
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Worker polls outbox and publishes to JetStream
loop {
let events = outbox_repo.find_unpublished().await?;
for event in events {
jetstream.publish(
event.subject(),
event.payload(),
Headers::new()
.add("Nats-Msg-Id", event.id.to_string())
.add("X-Correlation-Id", event.correlation_id)
).await?;
outbox_repo.mark_published(event.id).await?;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let consumer = jetstream.consumer("SEA_EVENTS", "case_mgmt__from_sea").await?;
loop {
let messages = consumer.fetch(10).await?;
for msg in messages {
match handler.handle(&msg).await {
Ok(_) | Err(AlreadyProcessed) => msg.ack().await?,
Err(PoisonMessage(e)) => {
dlq.publish(&msg, e).await?;
msg.ack().await?;
}
Err(TransientError(_)) => {
// Don't ack - will be redelivered
}
}
}
}
1
{context}.dlq.{original_subject}
1
2
3
4
5
# View DLQ messages
nats sub "sea.dlq.>" --count 10
# Republish after fix
nats pub sea.event.case_created.v1 --data @fixed_payload.json
| Metric | Alert Threshold |
|---|---|
stream_messages_pending |
> 1000 |
consumer_ack_pending |
> 100 |
consumer_redelivery_count |
> 50/min |
1
2
3
4
5
# Stream health
nats stream info SEA_EVENTS --json | jq '.state'
# Consumer lag
nats consumer info SEA_EVENTS sea__default --json | jq '.num_pending'