1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
event_version INT DEFAULT 1,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ DEFAULT NOW(),
correlation_id UUID,
causation_id UUID,
published_at TIMESTAMPTZ,
publish_attempts INT DEFAULT 0,
publish_error TEXT
);
CREATE INDEX idx_outbox_unpublished
ON outbox_events (occurred_at)
WHERE published_at IS NULL;
|
1
2
3
4
5
| 1. Application writes domain entity + outbox event (single TX)
2. Outbox worker polls for unpublished events
3. Worker publishes to JetStream with Nats-Msg-Id = event.id
4. JetStream deduplicates by Nats-Msg-Id
5. Worker marks event as published
|
JetStream provides at-least-once delivery, meaning consumers may receive duplicates:
1
2
3
4
5
6
7
8
9
10
11
12
| CREATE TABLE inbox_messages (
message_id UUID PRIMARY KEY,
subject TEXT NOT NULL,
received_at TIMESTAMPTZ DEFAULT NOW(),
processed_at TIMESTAMPTZ,
attempts INT DEFAULT 0,
last_error TEXT
);
CREATE INDEX idx_inbox_unprocessed
ON inbox_messages (received_at)
WHERE processed_at IS NULL;
|
1
2
3
4
5
6
| 1. Consumer fetches message from JetStream
2. INSERT INTO inbox (message_id) ON CONFLICT DO NOTHING
3. If insert succeeded → Process message
4. If conflict (duplicate) → Skip, ACK message
5. On success → Mark processed, ACK
6. On failure → Record error, NAK for retry
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| pub async fn run_outbox_publisher(
outbox: Arc<dyn OutboxRepository>,
jetstream: Arc<JetStreamContext>,
) -> Result<()> {
loop {
let events = outbox.find_unpublished(100).await?;
for event in events {
let result = jetstream.publish(
event.subject(),
event.payload(),
Headers::new()
.add("Nats-Msg-Id", event.id.to_string())
).await;
match result {
Ok(_) => outbox.mark_published(event.id).await?,
Err(e) => outbox.record_error(event.id, e.to_string()).await?,
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| pub async fn run_inbox_consumer(
consumer: Arc<PullConsumer>,
inbox: Arc<dyn InboxRepository>,
handler: Arc<dyn EventHandler>,
) -> Result<()> {
loop {
let messages = consumer.fetch(10).await?;
for msg in messages {
let msg_id: Uuid = msg.headers()
.get("Nats-Msg-Id")
.parse()?;
// Idempotency check
if inbox.exists(msg_id).await? {
msg.ack().await?;
continue;
}
inbox.insert(msg_id, msg.subject()).await?;
match handler.handle(&msg).await {
Ok(_) => {
inbox.mark_processed(msg_id).await?;
msg.ack().await?;
}
Err(PoisonMessage(e)) => {
dlq.publish(&msg, e).await?;
msg.ack().await?;
}
Err(e) => {
inbox.record_error(msg_id, e.to_string()).await?;
// Don't ACK - will be redelivered
}
}
}
}
}
|