Handle poison messages and failed deliveries.
Messages are routed to the Dead Letter Queue (DLQ) when:
1
2
3
4
{context}.dlq.{original_subject}
Example:
sea.dlq.sea.event.case_created.v1
1
2
3
4
5
6
7
8
9
10
11
# List DLQ subjects with messages
nats stream info SEA_EVENTS --json | \
jq '.state.subjects | to_entries |
map(select(.key | contains(".dlq."))) |
from_entries'
# Subscribe to DLQ (live)
nats sub "sea.dlq.>" --count 10
# View specific DLQ message
nats req "sea.dlq.sea.event.case_created.v1" ""
1
2
3
4
5
6
7
8
9
10
# Export DLQ messages to file
nats consumer add SEA_EVENTS dlq_export \
--filter "sea.dlq.>" \
--deliver-all \
--pull
nats consumer next SEA_EVENTS dlq_export --count 100 > dlq_messages.json
# Analyze common errors
cat dlq_messages.json | jq -r '.headers["X-DLQ-Reason"]' | sort | uniq -c
For fixable data issues:
1
2
3
4
5
6
7
8
# 1. Export failed message
nats consumer next SEA_EVENTS dlq_export --count 1 > failed.json
# 2. Fix the payload
vim failed.json
# 3. Republish to original subject
nats pub sea.event.case_created.v1 --data @failed.json
For cases requiring human intervention:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# dlq_processor.py
async def process_dlq():
messages = await get_dlq_messages()
for msg in messages:
reason = msg.headers.get("X-DLQ-Reason")
if reason == "INVALID_SCHEMA":
# Transform to valid schema
fixed = transform_payload(msg.data)
await republish(msg.subject, fixed)
elif reason == "MISSING_REFERENCE":
# Create missing reference first
await create_missing_reference(msg.data)
await republish(msg.subject, msg.data)
else:
# Flag for manual review
await create_support_ticket(msg)
For genuinely invalid messages:
1
2
# Acknowledge without processing
nats consumer ack SEA_EVENTS dlq_export <seq_number>
| Metric | Threshold | Action |
|---|---|---|
| DLQ message count | > 10 | Investigate |
| DLQ growth rate | > 5/hour | Check handlers |
| Same message repeated | > 3 times | Fix handler bug |
# DLQ message count
nats_consumer_num_pending{consumer=~".*dlq.*"}
# DLQ growth rate
rate(nats_consumer_num_delivered{consumer=~".*dlq.*"}[5m])