🔄 Message Replay Procedures
Replay events for recovery, bug fixes, and new consumers.
When to Replay
| Scenario |
Approach |
| Consumer bug fix |
Replay affected time range |
| New consumer onboarding |
Replay from stream start |
| Data recovery |
Replay after restoring state |
| Testing |
Replay in isolated environment |
Pre-Replay Checklist
Replay Methods
Method 1: New Consumer (Recommended)
Create a temporary consumer starting from a specific point:
1
2
3
4
5
6
7
8
9
10
11
12
| # Replay from timestamp
nats consumer add SEA_EVENTS replay__$(date +%s) \
--deliver-by-start-time "2026-01-01T00:00:00Z" \
--filter "sea.event.case_created.v1" \
--ack explicit \
--pull
# Process messages
nats consumer next SEA_EVENTS replay__1234567890 --count 100
# Delete replay consumer when done
nats consumer rm SEA_EVENTS replay__1234567890
|
Method 2: Reset Existing Consumer
⚠️ Caution: This affects production processing
1
2
3
4
5
6
7
8
9
10
11
12
13
| # Stop consumer service first
docker-compose stop mq-worker
# Reset to specific sequence
nats consumer update SEA_EVENTS sea__default \
--deliver-seq 12345
# Or reset to timestamp
nats consumer update SEA_EVENTS sea__default \
--deliver-by-start-time "2026-01-01T00:00:00Z"
# Restart consumer
docker-compose start mq-worker
|
Selective Replay
Filter by Subject
1
2
3
4
| nats consumer add SEA_EVENTS replay__cases \
--filter "sea.event.case_*.v1" \
--deliver-all \
--pull
|
Filter by Time Range
1
2
3
4
5
6
| # Start time
nats consumer add SEA_EVENTS replay__window \
--deliver-by-start-time "2026-01-01T00:00:00Z" \
--pull
# Then stop after reaching end time (manual check)
|
Replay Script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| #!/bin/bash
# replay-events.sh
STREAM=$1
FILTER=$2
START_TIME=$3
BATCH_SIZE=${4:-100}
CONSUMER="replay__$(date +%s)"
echo "Creating replay consumer: $CONSUMER"
nats consumer add $STREAM $CONSUMER \
--deliver-by-start-time "$START_TIME" \
--filter "$FILTER" \
--ack explicit \
--pull \
--max-deliver 1
echo "Fetching messages..."
while true; do
COUNT=$(nats consumer next $STREAM $CONSUMER --count $BATCH_SIZE 2>&1 | wc -l)
if [ "$COUNT" -lt "$BATCH_SIZE" ]; then
echo "Replay complete"
break
fi
done
echo "Cleaning up consumer"
nats consumer rm $STREAM $CONSUMER -f
|
Usage:
1
| ./replay-events.sh SEA_EVENTS "sea.event.case_created.v1" "2026-01-01T00:00:00Z"
|
Post-Replay Verification
1
2
3
4
5
6
7
8
9
10
| # Check consumer caught up
nats consumer info SEA_EVENTS sea__default --json | jq '.num_pending'
# Verify inbox processed
psql $DATABASE_URL -c "
SELECT COUNT(*) as total,
COUNT(processed_at) as processed
FROM inbox_messages
WHERE received_at > '2026-01-01 00:00:00';
"
|