🔄 Message Replay Procedures

Replay events for recovery, bug fixes, and new consumers.


When to Replay

Scenario Approach
Consumer bug fix Replay affected time range
New consumer onboarding Replay from stream start
Data recovery Replay after restoring state
Testing Replay in isolated environment

Pre-Replay Checklist


Replay Methods

Create a temporary consumer starting from a specific point:

1
2
3
4
5
6
7
8
9
10
11
12
# Replay from timestamp
nats consumer add SEA_EVENTS replay__$(date +%s) \
  --deliver-by-start-time "2026-01-01T00:00:00Z" \
  --filter "sea.event.case_created.v1" \
  --ack explicit \
  --pull

# Process messages
nats consumer next SEA_EVENTS replay__1234567890 --count 100

# Delete replay consumer when done
nats consumer rm SEA_EVENTS replay__1234567890

Method 2: Reset Existing Consumer

⚠️ Caution: This affects production processing

1
2
3
4
5
6
7
8
9
10
11
12
13
# Stop consumer service first
docker-compose stop mq-worker

# Reset to specific sequence
nats consumer update SEA_EVENTS sea__default \
  --deliver-seq 12345

# Or reset to timestamp
nats consumer update SEA_EVENTS sea__default \
  --deliver-by-start-time "2026-01-01T00:00:00Z"

# Restart consumer
docker-compose start mq-worker

Selective Replay

Filter by Subject

1
2
3
4
nats consumer add SEA_EVENTS replay__cases \
  --filter "sea.event.case_*.v1" \
  --deliver-all \
  --pull

Filter by Time Range

1
2
3
4
5
6
# Start time
nats consumer add SEA_EVENTS replay__window \
  --deliver-by-start-time "2026-01-01T00:00:00Z" \
  --pull

# Then stop after reaching end time (manual check)

Replay Script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/bin/bash
# replay-events.sh

STREAM=$1
FILTER=$2
START_TIME=$3
BATCH_SIZE=${4:-100}

CONSUMER="replay__$(date +%s)"

echo "Creating replay consumer: $CONSUMER"
nats consumer add $STREAM $CONSUMER \
  --deliver-by-start-time "$START_TIME" \
  --filter "$FILTER" \
  --ack explicit \
  --pull \
  --max-deliver 1

echo "Fetching messages..."
while true; do
  COUNT=$(nats consumer next $STREAM $CONSUMER --count $BATCH_SIZE 2>&1 | wc -l)
  if [ "$COUNT" -lt "$BATCH_SIZE" ]; then
    echo "Replay complete"
    break
  fi
done

echo "Cleaning up consumer"
nats consumer rm $STREAM $CONSUMER -f

Usage:

1
./replay-events.sh SEA_EVENTS "sea.event.case_created.v1" "2026-01-01T00:00:00Z"

Post-Replay Verification

1
2
3
4
5
6
7
8
9
10
# Check consumer caught up
nats consumer info SEA_EVENTS sea__default --json | jq '.num_pending'

# Verify inbox processed
psql $DATABASE_URL -c "
SELECT COUNT(*) as total,
       COUNT(processed_at) as processed
FROM inbox_messages
WHERE received_at > '2026-01-01 00:00:00';
"