📤📥 Outbox/Inbox Pattern

Transactional messaging with exactly-once semantics.


Overview

The Outbox/Inbox pattern ensures reliable event delivery with transactional consistency per SDS-047:

Component Purpose
Outbox Table Queues events atomically with domain writes
Outbox Publisher Polls and publishes to JetStream
Inbox Table Tracks processed messages for idempotency
Inbox Consumer Pulls from JetStream, deduplicates

Outbox Pattern

Why Outbox?

Without Outbox (dual-write problem):

1
2
3
# DANGEROUS: Not atomic!
await db.save(case)       # DB write
await jetstream.publish(event)  # May fail!

With Outbox (atomic):

1
2
3
4
5
6
# SAFE: Single transaction
await db.transaction([
    cases.insert(case),
    outbox.insert(CaseCreatedEvent(case)),
])
# Worker publishes later

Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE outbox_events (
  id UUID PRIMARY KEY,
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  event_type TEXT NOT NULL,
  event_version INT DEFAULT 1,
  payload JSONB NOT NULL,
  occurred_at TIMESTAMPTZ DEFAULT NOW(),
  correlation_id UUID,
  causation_id UUID,
  published_at TIMESTAMPTZ,
  publish_attempts INT DEFAULT 0,
  publish_error TEXT
);

CREATE INDEX idx_outbox_unpublished 
  ON outbox_events (occurred_at) 
  WHERE published_at IS NULL;

Publishing Flow

1
2
3
4
5
1. Application writes domain entity + outbox event (single TX)
2. Outbox worker polls for unpublished events
3. Worker publishes to JetStream with Nats-Msg-Id = event.id
4. JetStream deduplicates by Nats-Msg-Id
5. Worker marks event as published

Inbox Pattern

Why Inbox?

JetStream provides at-least-once delivery, meaning consumers may receive duplicates:

Scenario Solution
Consumer ACK lost Inbox table check
Network partition Inbox table check
Redelivery after timeout Inbox table check

Schema

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE inbox_messages (
  message_id UUID PRIMARY KEY,
  subject TEXT NOT NULL,
  received_at TIMESTAMPTZ DEFAULT NOW(),
  processed_at TIMESTAMPTZ,
  attempts INT DEFAULT 0,
  last_error TEXT
);

CREATE INDEX idx_inbox_unprocessed 
  ON inbox_messages (received_at) 
  WHERE processed_at IS NULL;

Processing Flow

1
2
3
4
5
6
1. Consumer fetches message from JetStream
2. INSERT INTO inbox (message_id) ON CONFLICT DO NOTHING
3. If insert succeeded → Process message
4. If conflict (duplicate) → Skip, ACK message
5. On success → Mark processed, ACK
6. On failure → Record error, NAK for retry

Handler Response Codes

Code Meaning Consumer Action
200 Success Mark processed → ACK
409 Already processed ACK (idempotent)
422 Poison message Route to DLQ → ACK
5xx Transient failure NAK (redelivery)

Rust Worker Implementation

Outbox Publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pub async fn run_outbox_publisher(
    outbox: Arc<dyn OutboxRepository>,
    jetstream: Arc<JetStreamContext>,
) -> Result<()> {
    loop {
        let events = outbox.find_unpublished(100).await?;
        
        for event in events {
            let result = jetstream.publish(
                event.subject(),
                event.payload(),
                Headers::new()
                    .add("Nats-Msg-Id", event.id.to_string())
            ).await;
            
            match result {
                Ok(_) => outbox.mark_published(event.id).await?,
                Err(e) => outbox.record_error(event.id, e.to_string()).await?,
            }
        }
        
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

Inbox Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
pub async fn run_inbox_consumer(
    consumer: Arc<PullConsumer>,
    inbox: Arc<dyn InboxRepository>,
    handler: Arc<dyn EventHandler>,
) -> Result<()> {
    loop {
        let messages = consumer.fetch(10).await?;
        
        for msg in messages {
            let msg_id: Uuid = msg.headers()
                .get("Nats-Msg-Id")
                .parse()?;
            
            // Idempotency check
            if inbox.exists(msg_id).await? {
                msg.ack().await?;
                continue;
            }
            
            inbox.insert(msg_id, msg.subject()).await?;
            
            match handler.handle(&msg).await {
                Ok(_) => {
                    inbox.mark_processed(msg_id).await?;
                    msg.ack().await?;
                }
                Err(PoisonMessage(e)) => {
                    dlq.publish(&msg, e).await?;
                    msg.ack().await?;
                }
                Err(e) => {
                    inbox.record_error(msg_id, e.to_string()).await?;
                    // Don't ACK - will be redelivered
                }
            }
        }
    }
}

Monitoring

Key Metrics

Metric Query
Outbox backlog SELECT COUNT(*) FROM outbox_events WHERE published_at IS NULL
Oldest unpublished SELECT MIN(occurred_at) FROM outbox_events WHERE published_at IS NULL
Inbox backlog SELECT COUNT(*) FROM inbox_messages WHERE processed_at IS NULL
Failed attempts SELECT COUNT(*) FROM inbox_messages WHERE attempts > 3