📨 NATS JetStream Messaging

Persistent, ordered, at-least-once event delivery.


Overview

SEAâ„¢ uses NATS JetStream for all asynchronous communication per ADR-032:

Feature Benefit
Persistence Events survive restarts
Ordering Per-subject message ordering
Replay Historical event access
Exactly-once Deduplication via message IDs
Scaling Horizontal consumer scaling

Stream Configuration

SEA_EVENTS Stream

1
2
3
4
5
6
7
8
name: SEA_EVENTS
subjects:
  - "sea.event.>"
retention: limits
max_age: 7d
max_bytes: 10GB
storage: file
replicas: 3  # production

Subject Naming Convention

1
2
3
4
5
6
{bounded_context}.event.{event_name}.v{version}

Examples:
  sea.event.case_created.v1
  sea.event.task_completed.v1
  vibespro.event.vibe_generated.v1

Consumer Patterns

Durable Pull Consumer

1
2
3
4
5
6
7
stream: SEA_EVENTS
durable: {service}__{purpose}
filter_subject: "sea.event.>"
ack_policy: explicit
ack_wait: 120s
max_deliver: 20
max_ack_pending: 50

Consumer Naming

1
2
3
4
5
{consuming_service}__{from_context}

Examples:
  case_mgmt__from_sea
  analytics__from_vibespro

Message Headers

Header Purpose
Nats-Msg-Id Deduplication (= outbox event ID)
X-Correlation-Id Request tracing
X-Causation-Id Event chain tracking
X-Event-Version Schema version

Publishing Events

From Application Code

1
2
3
4
5
6
7
8
9
10
11
# Events are written to the outbox, not published directly
async def create_case(cmd: CreateCase) -> Case:
    case = Case.create(cmd)
    
    # Commit case AND outbox event atomically
    await db.transaction([
        cases.insert(case),
        outbox.insert(CaseCreatedEvent(case)),
    ])
    
    return case

Outbox Worker Publishes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Worker polls outbox and publishes to JetStream
loop {
    let events = outbox_repo.find_unpublished().await?;
    for event in events {
        jetstream.publish(
            event.subject(),
            event.payload(),
            Headers::new()
                .add("Nats-Msg-Id", event.id.to_string())
                .add("X-Correlation-Id", event.correlation_id)
        ).await?;
        outbox_repo.mark_published(event.id).await?;
    }
}

Consuming Events

Pull-Based Consumption

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let consumer = jetstream.consumer("SEA_EVENTS", "case_mgmt__from_sea").await?;

loop {
    let messages = consumer.fetch(10).await?;
    for msg in messages {
        match handler.handle(&msg).await {
            Ok(_) | Err(AlreadyProcessed) => msg.ack().await?,
            Err(PoisonMessage(e)) => {
                dlq.publish(&msg, e).await?;
                msg.ack().await?;
            }
            Err(TransientError(_)) => {
                // Don't ack - will be redelivered
            }
        }
    }
}

Dead Letter Queue

DLQ Subject Pattern

1
{context}.dlq.{original_subject}

DLQ Processing

1
2
3
4
5
# View DLQ messages
nats sub "sea.dlq.>" --count 10

# Republish after fix
nats pub sea.event.case_created.v1 --data @fixed_payload.json

Monitoring

Key Metrics

Metric Alert Threshold
stream_messages_pending > 1000
consumer_ack_pending > 100
consumer_redelivery_count > 50/min

Health Checks

1
2
3
4
5
# Stream health
nats stream info SEA_EVENTS --json | jq '.state'

# Consumer lag
nats consumer info SEA_EVENTS sea__default --json | jq '.num_pending'