Patterns for connecting SEA™ to third-party services.
SEA™ integrates with external systems through well-defined patterns:
| Integration Type | Pattern | Example |
|---|---|---|
| Sync request/response | HTTP Client + Circuit Breaker | Payment gateway |
| Async events | Webhook receiver + Outbox | GitHub webhooks |
| Data sync | CDC + Event bridge | CRM sync |
| File exchange | Blob storage + events | Document import |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from circuitbreaker import circuit
@circuit(
failure_threshold=5,
recovery_timeout=30,
expected_exception=ExternalServiceError
)
async def call_payment_gateway(request: PaymentRequest) -> PaymentResponse:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{PAYMENT_URL}/charge",
json=request.model_dump(),
headers={"Authorization": f"Bearer {API_KEY}"}
)
response.raise_for_status()
return PaymentResponse.model_validate(response.json())
1
2
3
4
5
6
7
8
9
10
11
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(
wait=wait_exponential(multiplier=1, min=1, max=60),
stop=stop_after_attempt(5)
)
async def fetch_with_retry(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@router.post("/webhooks/{provider}")
async def receive_webhook(
provider: str,
request: Request,
background_tasks: BackgroundTasks,
):
# 1. Verify signature
signature = request.headers.get("X-Signature")
body = await request.body()
if not verify_signature(provider, signature, body):
raise HTTPException(401, "Invalid signature")
# 2. Parse and validate
event = parse_webhook(provider, body)
# 3. Store in outbox for reliable processing
await outbox.insert(WebhookReceived(
provider=provider,
event_type=event.type,
payload=event.data,
))
# 4. Return 200 quickly
return {"status": "accepted"}
1
2
3
4
5
6
7
8
9
@router.post("/webhooks/github")
async def github_webhook(request: Request):
delivery_id = request.headers.get("X-GitHub-Delivery")
# Check if already processed
if await inbox.exists(delivery_id):
return {"status": "duplicate"}
# Process...
Translate external models to internal domain models:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class CRMAntiCorruptionLayer:
"""Translates CRM concepts to SEA™ domain."""
def to_case(self, crm_ticket: dict) -> Case:
return Case(
external_id=crm_ticket["id"],
title=crm_ticket["subject"],
priority=self._map_priority(crm_ticket["urgency"]),
status=self._map_status(crm_ticket["state"]),
metadata={
"source": "crm",
"original": crm_ticket,
}
)
def _map_priority(self, urgency: str) -> Priority:
mapping = {
"critical": Priority.HIGH,
"high": Priority.HIGH,
"normal": Priority.MEDIUM,
"low": Priority.LOW,
}
return mapping.get(urgency, Priority.MEDIUM)
1
2
3
4
5
6
7
8
9
10
# External service configuration
PAYMENT_GATEWAY_URL=https://api.stripe.com
PAYMENT_GATEWAY_API_KEY=sk_live_xxx
CRM_API_URL=https://api.salesforce.com
CRM_CLIENT_ID=xxx
CRM_CLIENT_SECRET=xxx
DOCUMENT_STORAGE_URL=https://s3.amazonaws.com
DOCUMENT_STORAGE_BUCKET=sea-documents
1
2
3
4
5
6
7
8
9
from pydantic_settings import BaseSettings
class ExternalConfig(BaseSettings):
payment_gateway_url: str
payment_gateway_api_key: str = Field(..., repr=False) # Hide in logs
class Config:
env_prefix = "PAYMENT_GATEWAY_"
secrets_dir = "/run/secrets"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@router.get("/health/dependencies")
async def check_dependencies():
checks = {
"payment_gateway": await check_payment_gateway(),
"crm": await check_crm(),
"document_storage": await check_storage(),
}
all_healthy = all(c["healthy"] for c in checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"dependencies": checks,
}
async def check_payment_gateway() -> dict:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(f"{PAYMENT_URL}/health")
return {"healthy": r.status_code == 200, "latency_ms": r.elapsed.microseconds / 1000}
except Exception as e:
return {"healthy": False, "error": str(e)}
1
2
3
4
5
6
7
8
9
10
from limits import parse, storage, strategies
# 100 requests per minute
rate_limit = parse("100/minute")
limiter = strategies.MovingWindowRateLimiter(storage.MemoryStorage())
async def call_with_rate_limit(key: str, fn: Callable) -> Any:
if not limiter.hit(rate_limit, key):
raise RateLimitExceeded(f"Rate limit exceeded for {key}")
return await fn()