SDS-047: Messaging Infrastructure Service

Status: Draft
Version: 1.0
Date: 2025-12-28
Satisfies: PRD-022, ADR-032
Bounded-Context: shared


0. Isomorphism Declaration

Spec Section SEA-DSL Target Cardinality Verification
2.1 OutboxEvent Entity node 1:1 Field names + types match
2.2 InboxMessage Entity node 1:1 Field names + types match
3. Policies Policy nodes 1:1 Boolean expression verbatim
4. Commands Flow @command 1:1 Input/output schema identical
7. Ports Interface definitions 1:1 Method signatures preserved

1. Domain Glossary

Term Definition Type Canonical?
OutboxEvent Transactional event record ensuring guaranteed delivery Entity
InboxMessage Idempotent message receipt for exactly-once processing Entity
OutboxPublisher Worker that polls outbox and publishes to JetStream Service
InboxConsumer Worker that pulls from JetStream and processes via handler Service
JetStreamStream NATS persistent message storage per bounded context Infrastructure
DurableConsumer Pull-based consumer tracking delivery/acks Infrastructure
DeadLetterQueue Subject for poison messages bypassing normal processing Infrastructure

2. Domain Model

2.1 Entities

Entity Fields Identity Aggregate Root Invariants
OutboxEvent id: UUID, aggregate_type: string, aggregate_id: string, event_type: string, event_version: int, payload: JSON, occurred_at: DateTime, correlation_id: UUID?, causation_id: UUID?, published_at: DateTime?, publish_attempts: int, publish_error: string? id yes POL-047-001, POL-047-002
InboxMessage message_id: UUID, subject: string, received_at: DateTime, processed_at: DateTime?, attempts: int, last_error: string? message_id yes POL-047-003, POL-047-004

2.2 Value Objects

Value Object Fields Constraints
EventSubject value: string Pattern: {context}.event.{name}.v{n}
CorrelationContext correlation_id: UUID, causation_id: UUID Both optional
NatsMsgId value: string Must match outbox event UUID

3. Invariants & Policies

3.1 Outbox Policies

POL-ID Applies-To Rule Error Code Satisfies
POL-047-001 OutboxEvent outbox.id IS NOT NULL AND outbox.event_type IS NOT NULL INVALID_OUTBOX REQ-050
POL-047-002 OutboxEvent outbox.occurred_at <= NOW() FUTURE_EVENT REQ-050

3.2 Inbox Policies

POL-ID Applies-To Rule Error Code Satisfies
POL-047-003 InboxMessage inbox.message_id IS UNIQUE DUPLICATE_MESSAGE REQ-051
POL-047-004 InboxMessage inbox.processed_at IS NULL OR inbox.processed_at >= inbox.received_at INVALID_TIMESTAMP REQ-051

3.3 ACK Policies

POL-ID Applies-To Rule Error Code Satisfies
POL-047-005 InboxConsumer handler_response = 200 OR handler_response = 409 → ACK N/A REQ-051
POL-047-006 InboxConsumer handler_response = 422 → ACK + DLQ N/A REQ-053
POL-047-007 InboxConsumer handler_response = 5xx OR timeout → NO ACK N/A REQ-052

4. CQRS: Commands

CMD-050: PublishOutboxEvent

Field Value
CMD-ID CMD-050
Input outbox_event_id: UUID
Preconditions outbox.published_at IS NULL
State Changes Sets published_at, increments publish_attempts
Emits Events N/A (this IS the event publishing)
Satisfies REQ-050, REQ-054

Idempotency Specification: | Idempotent? | Key Expression | Strategy | Behavior on Duplicate | |————-|—————-|———-|———————-| | yes | Nats-Msg-Id = outbox.id | JetStream dedupe | No duplicate in stream |

CMD-051: ProcessInboxMessage

Field Value
CMD-ID CMD-051
Input message_id: UUID, subject: string, payload: JSON
Preconditions POL-047-003 (unique PK)
State Changes Inserts inbox, calls handler, sets processed_at
Emits Events Handler-specific
Satisfies REQ-051

Idempotency Specification: | Idempotent? | Key Expression | Strategy | Behavior on Duplicate | |————-|—————-|———-|———————-| | yes | message_id | Inbox PK conflict | Skip processing |

CMD-052: RouteToDeadLetter

Field Value
CMD-ID CMD-052
Input message_id: UUID, reason: string, original_subject: string
Preconditions Handler returned 422
State Changes Publishes to DLQ subject
Emits Events EVT-047-002 (MessageDeadLettered)
Satisfies REQ-053

Idempotency Specification: | Idempotent? | Key Expression | Strategy | Behavior on Duplicate | |————-|—————-|———-|———————-| | yes | message_id | DLQ message ID | No duplicate in DLQ |


5. CQRS: Queries

QRY-050: GetOutboxBacklog

Field Value
QRY-ID QRY-050
Input context: string
Output { count: int, oldest_at: DateTime? }
Read Model outbox_events WHERE published_at IS NULL
Satisfies Observability
Idempotent? yes

QRY-051: GetInboxBacklog

Field Value
QRY-ID QRY-051
Input context: string
Output { count: int, oldest_at: DateTime? }
Read Model inbox_messages WHERE processed_at IS NULL
Satisfies Observability
Idempotent? yes

6. Domain Events

EVT-047-001: OutboxEventPublished

Field Value
EVT-ID EVT-047-001
Payload outbox_id, event_type, subject, published_at
Published To Internal metrics only
Satisfies Observability

EVT-047-002: MessageDeadLettered

Field Value
EVT-ID EVT-047-002
Payload message_id, original_subject, reason, attempts
Published To {context}.dlq.>
Consumers Alerting, manual intervention
Satisfies REQ-053

7. Ports

PORT-ID Port Name Direction Methods
PORT-047-001 OutboxRepository outbound findUnpublished(), markPublished(id), incrementAttempts(id, error)
PORT-047-002 InboxRepository outbound insert(msg), markProcessed(id), recordError(id, error)
PORT-047-003 JetStreamPublisher outbound publish(subject, payload, headers)
PORT-047-004 JetStreamConsumer outbound fetch(batch), ack(msg), nak(msg)
PORT-047-005 EventHandler outbound handle(event_type, payload): HandlerResult

8. Adapters

Category Choice Notes
NATS Client async-nats (Rust) JetStream support
Database sqlx (Rust) Compile-time checked queries
Runtime tokio Async runtime
HTTP Handler reqwest (Rust) For calling FastAPI
Observability tracing + tracing-subscriber Structured logging

9. Database Schema

Outbox Table (per context DB)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE outbox_events (
  id UUID PRIMARY KEY,
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  event_type TEXT NOT NULL,
  event_version INT NOT NULL DEFAULT 1,
  payload JSONB NOT NULL,
  occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  correlation_id UUID,
  causation_id UUID,
  published_at TIMESTAMPTZ,
  publish_attempts INT NOT NULL DEFAULT 0,
  publish_error TEXT,

  CONSTRAINT chk_occurred_not_future CHECK (occurred_at <= NOW() + INTERVAL '1 minute')
);

CREATE INDEX idx_outbox_unpublished ON outbox_events (occurred_at)
  WHERE published_at IS NULL;
CREATE INDEX idx_outbox_correlation ON outbox_events (correlation_id);

Inbox Table (per context DB)

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE inbox_messages (
  message_id UUID PRIMARY KEY,
  subject TEXT NOT NULL,
  received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  processed_at TIMESTAMPTZ,
  attempts INT NOT NULL DEFAULT 0,
  last_error TEXT
);

CREATE INDEX idx_inbox_unprocessed ON inbox_messages (received_at)
  WHERE processed_at IS NULL;

10. JetStream Configuration

Streams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# SEA_EVENTS stream
name: SEA_EVENTS
subjects: ["sea.event.>"]
retention: limits
max_age: 7d
max_bytes: 10GB
storage: file
replicas: 3  # production

# VIBESPRO_EVENTS stream
name: VIBESPRO_EVENTS
subjects: ["vibespro.event.>"]
retention: limits
max_age: 7d
max_bytes: 10GB
storage: file
replicas: 3

Consumers

1
2
3
4
5
6
7
8
# SEA™ consuming from VIBESPRO_EVENTS
stream: VIBESPRO_EVENTS
durable: sea__from_vibespro
filter_subject: "vibespro.event.>"
ack_policy: explicit
ack_wait: 120s
max_deliver: 20
max_ack_pending: 50

11. Rust Worker Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────┐
│                    sea-mq-worker (Rust)                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────┐    ┌──────────────────────────┐   │
│  │   OutboxPublisher    │    │    InboxConsumer         │   │
│  │   - poll outbox      │    │    - pull from JetStream │   │
│  │   - publish w/ msgid │    │    - insert inbox        │   │
│  │   - mark published   │    │    - call handler        │   │
│  └──────────┬───────────┘    │    - ACK/NAK/DLQ         │   │
│             │                └────────────┬─────────────┘   │
│             ▼                             ▼                  │
│  ┌──────────────────────────────────────────────────────┐   │
│  │            PostgreSQL (sea_db)                        │   │
│  │   outbox_events  │  inbox_messages                    │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│             │                             │                  │
│             ▼                             ▼                  │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                  NATS JetStream                       │   │
│  │   SEA_EVENTS (publish)  │  VIBESPRO_EVENTS (consume)  │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│                                           │                  │
│                                           ▼                  │
│  ┌──────────────────────────────────────────────────────┐   │
│  │            sea-handlers (FastAPI)                     │   │
│  │   POST /handle { event_type, payload, ... }           │   │
│  │   Returns: 200 | 409 | 422 | 5xx                      │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

12. Handler Contract (FastAPI)

Request

1
2
3
4
5
6
7
8
9
10
11
POST /handle
{
  "message_id": "550e8400-e29b-41d4-a716-446655440000",
  "subject": "vibespro.event.vibe_created.v1",
  "event_type": "vibe_created",
  "event_version": 1,
  "occurred_at": "2025-12-28T22:00:00Z",
  "correlation_id": "...",
  "causation_id": "...",
  "payload": { ... }
}

Response Codes

Code Meaning Worker Action
200 Processed successfully Mark processed → ACK
409 Already processed (idempotent) Mark processed → ACK
422 Poison message (bad payload) Route to DLQ → ACK
5xx Transient failure DO NOT ACK (redeliver)

13. DI / Wiring

Port Adapter Lifetime
OutboxRepository PostgresOutboxAdapter request
InboxRepository PostgresInboxAdapter request
JetStreamPublisher NatsJetStreamPublisher singleton
JetStreamConsumer NatsJetStreamConsumer singleton
EventHandler HttpEventHandler singleton