🧩 Integration Patterns Catalog

Proven patterns for common integration scenarios.


Event Patterns

Pattern: Outbox + Publish

When: Guaranteeing event delivery with transactional consistency

1
2
3
4
5
6
7
8
# ✅ Correct
async with db.transaction():
    await cases.insert(case)
    await outbox.insert(CaseCreatedEvent(case))

# ❌ Wrong - dual write problem
await db.save(case)
await jetstream.publish(event)  # May fail!

Pattern: Inbox + Idempotency

When: Processing events exactly once

1
2
3
4
5
6
7
8
9
async def handle(msg: Message):
    # Try insert first
    inserted = await inbox.insert(msg.id)
    if not inserted:
        return  # Duplicate, skip
    
    # Process with confidence
    await process(msg.data)
    await inbox.mark_processed(msg.id)

Pattern: Correlation Tracking

When: Tracing requests across services

1
2
3
4
5
6
7
8
9
10
11
12
13
correlation_id = request.headers.get("X-Correlation-Id") or str(uuid4())

# Pass to all downstream calls
await http_client.post(url, headers={
    "X-Correlation-Id": correlation_id,
    "X-Causation-Id": current_event_id,
})

# Include in published events
await outbox.insert(Event(
    correlation_id=correlation_id,
    causation_id=triggering_event_id,
))

API Patterns

Pattern: Idempotency Key

When: Preventing duplicate mutations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@router.post("/cases")
async def create_case(
    request: CreateCaseRequest,
    idempotency_key: str = Header(alias="X-Idempotency-Key"),
):
    # Check cache
    cached = await cache.get(f"idem:{idempotency_key}")
    if cached:
        return cached
    
    # Process and cache
    result = await create(request)
    await cache.set(f"idem:{idempotency_key}", result, ttl=86400)
    return result

Pattern: Pagination

When: Listing large collections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@router.get("/cases")
async def list_cases(
    page: int = Query(1, ge=1),
    limit: int = Query(20, ge=1, le=100),
):
    offset = (page - 1) * limit
    items = await cases.list(offset=offset, limit=limit)
    total = await cases.count()
    
    return {
        "data": items,
        "pagination": {
            "page": page,
            "limit": limit,
            "total": total,
            "totalPages": (total + limit - 1) // limit,
        }
    }

External Integration Patterns

Pattern: Circuit Breaker

When: Protecting against cascading failures

1
2
3
4
5
6
7
8
9
10
11
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
async def call_external_api(request):
    return await http_client.post(EXTERNAL_URL, json=request)

# Usage with fallback
try:
    result = await call_external_api(data)
except CircuitBreakerError:
    result = await get_cached_or_default(data)

Pattern: Anti-Corruption Layer

When: Isolating external models from domain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ExternalCRMAdapter:
    """Translates CRM API to domain."""
    
    async def get_customer(self, id: str) -> Customer:
        response = await self.client.get(f"/customers/{id}")
        return self._to_customer(response.json())
    
    def _to_customer(self, data: dict) -> Customer:
        # Transform external model to domain model
        return Customer(
            id=CustomerId(data["cust_id"]),
            name=data["full_name"],
            email=Email(data["email_address"]),
        )

Pattern: Webhook Receiver

When: Receiving events from external systems

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@router.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    # 1. Verify signature
    signature = request.headers.get("Stripe-Signature")
    body = await request.body()
    
    try:
        event = stripe.Webhook.construct_event(
            body, signature, WEBHOOK_SECRET
        )
    except ValueError:
        raise HTTPException(400, "Invalid payload")
    
    # 2. Store in outbox for reliable processing
    await outbox.insert(WebhookReceived(
        source="stripe",
        event_type=event["type"],
        payload=event["data"],
    ))
    
    # 3. Return 200 quickly (process async)
    return {"received": True}

Messaging Patterns

Pattern: Consumer Groups

When: Scaling message processing

1
2
3
4
# Multiple instances share load
consumer_1: sea_service__context (instance 1)
consumer_2: sea_service__context (instance 2)
# Same durable name = shared queue semantics

Pattern: Fan-out

When: Multiple consumers need all messages

1
2
3
4
5
# Each service has unique consumer
analytics: analytics__from_sea
notifications: notifications__from_sea
audit: audit__from_sea
# Different durable names = each gets all messages