🔌 External Systems Integration

Patterns for connecting SEA™ to third-party services.


Overview

SEA™ integrates with external systems through well-defined patterns:

Integration Type Pattern Example
Sync request/response HTTP Client + Circuit Breaker Payment gateway
Async events Webhook receiver + Outbox GitHub webhooks
Data sync CDC + Event bridge CRM sync
File exchange Blob storage + events Document import

HTTP Client Pattern

Circuit Breaker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from circuitbreaker import circuit

@circuit(
    failure_threshold=5,
    recovery_timeout=30,
    expected_exception=ExternalServiceError
)
async def call_payment_gateway(request: PaymentRequest) -> PaymentResponse:
    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(
            f"{PAYMENT_URL}/charge",
            json=request.model_dump(),
            headers={"Authorization": f"Bearer {API_KEY}"}
        )
        response.raise_for_status()
        return PaymentResponse.model_validate(response.json())

Retry with Backoff

1
2
3
4
5
6
7
8
9
10
11
from tenacity import retry, wait_exponential, stop_after_attempt

@retry(
    wait=wait_exponential(multiplier=1, min=1, max=60),
    stop=stop_after_attempt(5)
)
async def fetch_with_retry(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

Webhook Receiver Pattern

Endpoint Design

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@router.post("/webhooks/{provider}")
async def receive_webhook(
    provider: str,
    request: Request,
    background_tasks: BackgroundTasks,
):
    # 1. Verify signature
    signature = request.headers.get("X-Signature")
    body = await request.body()
    if not verify_signature(provider, signature, body):
        raise HTTPException(401, "Invalid signature")
    
    # 2. Parse and validate
    event = parse_webhook(provider, body)
    
    # 3. Store in outbox for reliable processing
    await outbox.insert(WebhookReceived(
        provider=provider,
        event_type=event.type,
        payload=event.data,
    ))
    
    # 4. Return 200 quickly
    return {"status": "accepted"}

Idempotency

1
2
3
4
5
6
7
8
9
@router.post("/webhooks/github")
async def github_webhook(request: Request):
    delivery_id = request.headers.get("X-GitHub-Delivery")
    
    # Check if already processed
    if await inbox.exists(delivery_id):
        return {"status": "duplicate"}
    
    # Process...

Anti-Corruption Layer

Purpose

Translate external models to internal domain models:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CRMAntiCorruptionLayer:
    """Translates CRM concepts to SEA™ domain."""
    
    def to_case(self, crm_ticket: dict) -> Case:
        return Case(
            external_id=crm_ticket["id"],
            title=crm_ticket["subject"],
            priority=self._map_priority(crm_ticket["urgency"]),
            status=self._map_status(crm_ticket["state"]),
            metadata={
                "source": "crm",
                "original": crm_ticket,
            }
        )
    
    def _map_priority(self, urgency: str) -> Priority:
        mapping = {
            "critical": Priority.HIGH,
            "high": Priority.HIGH,
            "normal": Priority.MEDIUM,
            "low": Priority.LOW,
        }
        return mapping.get(urgency, Priority.MEDIUM)

Configuration Management

Environment Variables

1
2
3
4
5
6
7
8
9
10
# External service configuration
PAYMENT_GATEWAY_URL=https://api.stripe.com
PAYMENT_GATEWAY_API_KEY=sk_live_xxx

CRM_API_URL=https://api.salesforce.com
CRM_CLIENT_ID=xxx
CRM_CLIENT_SECRET=xxx

DOCUMENT_STORAGE_URL=https://s3.amazonaws.com
DOCUMENT_STORAGE_BUCKET=sea-documents

Secrets Management

1
2
3
4
5
6
7
8
9
from pydantic_settings import BaseSettings

class ExternalConfig(BaseSettings):
    payment_gateway_url: str
    payment_gateway_api_key: str = Field(..., repr=False)  # Hide in logs
    
    class Config:
        env_prefix = "PAYMENT_GATEWAY_"
        secrets_dir = "/run/secrets"

Health Checks

External Dependency Check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@router.get("/health/dependencies")
async def check_dependencies():
    checks = {
        "payment_gateway": await check_payment_gateway(),
        "crm": await check_crm(),
        "document_storage": await check_storage(),
    }
    
    all_healthy = all(c["healthy"] for c in checks.values())
    
    return {
        "status": "healthy" if all_healthy else "degraded",
        "dependencies": checks,
    }

async def check_payment_gateway() -> dict:
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            r = await client.get(f"{PAYMENT_URL}/health")
            return {"healthy": r.status_code == 200, "latency_ms": r.elapsed.microseconds / 1000}
    except Exception as e:
        return {"healthy": False, "error": str(e)}

Rate Limiting External Calls

Token Bucket

1
2
3
4
5
6
7
8
9
10
from limits import parse, storage, strategies

# 100 requests per minute
rate_limit = parse("100/minute")
limiter = strategies.MovingWindowRateLimiter(storage.MemoryStorage())

async def call_with_rate_limit(key: str, fn: Callable) -> Any:
    if not limiter.hit(rate_limit, key):
        raise RateLimitExceeded(f"Rate limit exceeded for {key}")
    return await fn()