Proven patterns for common integration scenarios.
When: Guaranteeing event delivery with transactional consistency
1
2
3
4
5
6
7
8
# ✅ Correct
async with db.transaction():
await cases.insert(case)
await outbox.insert(CaseCreatedEvent(case))
# ❌ Wrong - dual write problem
await db.save(case)
await jetstream.publish(event) # May fail!
When: Processing events exactly once
1
2
3
4
5
6
7
8
9
async def handle(msg: Message):
# Try insert first
inserted = await inbox.insert(msg.id)
if not inserted:
return # Duplicate, skip
# Process with confidence
await process(msg.data)
await inbox.mark_processed(msg.id)
When: Tracing requests across services
1
2
3
4
5
6
7
8
9
10
11
12
13
correlation_id = request.headers.get("X-Correlation-Id") or str(uuid4())
# Pass to all downstream calls
await http_client.post(url, headers={
"X-Correlation-Id": correlation_id,
"X-Causation-Id": current_event_id,
})
# Include in published events
await outbox.insert(Event(
correlation_id=correlation_id,
causation_id=triggering_event_id,
))
When: Preventing duplicate mutations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@router.post("/cases")
async def create_case(
request: CreateCaseRequest,
idempotency_key: str = Header(alias="X-Idempotency-Key"),
):
# Check cache
cached = await cache.get(f"idem:{idempotency_key}")
if cached:
return cached
# Process and cache
result = await create(request)
await cache.set(f"idem:{idempotency_key}", result, ttl=86400)
return result
When: Listing large collections
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@router.get("/cases")
async def list_cases(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
):
offset = (page - 1) * limit
items = await cases.list(offset=offset, limit=limit)
total = await cases.count()
return {
"data": items,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"totalPages": (total + limit - 1) // limit,
}
}
When: Protecting against cascading failures
1
2
3
4
5
6
7
8
9
10
11
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_external_api(request):
return await http_client.post(EXTERNAL_URL, json=request)
# Usage with fallback
try:
result = await call_external_api(data)
except CircuitBreakerError:
result = await get_cached_or_default(data)
When: Isolating external models from domain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ExternalCRMAdapter:
"""Translates CRM API to domain."""
async def get_customer(self, id: str) -> Customer:
response = await self.client.get(f"/customers/{id}")
return self._to_customer(response.json())
def _to_customer(self, data: dict) -> Customer:
# Transform external model to domain model
return Customer(
id=CustomerId(data["cust_id"]),
name=data["full_name"],
email=Email(data["email_address"]),
)
When: Receiving events from external systems
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@router.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
# 1. Verify signature
signature = request.headers.get("Stripe-Signature")
body = await request.body()
try:
event = stripe.Webhook.construct_event(
body, signature, WEBHOOK_SECRET
)
except ValueError:
raise HTTPException(400, "Invalid payload")
# 2. Store in outbox for reliable processing
await outbox.insert(WebhookReceived(
source="stripe",
event_type=event["type"],
payload=event["data"],
))
# 3. Return 200 quickly (process async)
return {"received": True}
When: Scaling message processing
1
2
3
4
# Multiple instances share load
consumer_1: sea_service__context (instance 1)
consumer_2: sea_service__context (instance 2)
# Same durable name = shared queue semantics
When: Multiple consumers need all messages
1
2
3
4
5
# Each service has unique consumer
analytics: analytics__from_sea
notifications: notifications__from_sea
audit: audit__from_sea
# Different durable names = each gets all messages