For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Complete the remaining items flagged as “not to rely on,” “cautious about,” and “missing/unclear” in docs/workdocs/capability-report.md (excluding Zed/WASM) so the platform is production-ready.
Architecture: We will close gaps across four areas: production auth, drift governance, reasoning integration, and resilience/performance (perf + chaos + runbooks). Each task adds a minimal test first, implements the smallest viable change, and adds documentation where required.
Tech Stack: FastAPI (Python), React (Vite), jose/jwks, pytest, Playwright, just, Docker Compose, Markdown.
Files:
services/workbench-bff/src/api/auth.pyservices/workbench-bff/src/api/jwks_cache.pyservices/workbench-bff/tests/test_auth_rbac.pyservices/workbench-bff/pyproject.tomldocs/howto/use-identity-library.md (or create docs/howto/workbench-auth.md)Step 1: Write failing tests for JWT signature verification
1
2
3
4
5
def test_oidc_provider_rejects_invalid_signature():
# given a JWT signed with a different key
# when validate_token is called
# then it raises 401 / returns False
...
Step 2: Run test to verify it fails
Run: pytest services/workbench-bff/tests/test_auth_rbac.py -v
Expected: FAIL (currently accepts unsigned tokens).
Step 3: Implement JWKS fetch + signature verification
1
2
3
4
# services/workbench-bff/src/api/jwks_cache.py
class JwksCache:
def get_signing_key(self, jwks_url: str, kid: str) -> dict:
...
Update OIDCProvider.authenticate() to:
kidverify_signature=True)Configuration & Resilience:
JWKS_CACHE_TTL env var (default 3600) for key cache durationJwksCache.get_signing_key(): exponential backoff (2^n * 100ms), max 3 retriesAUTH_VERIFY_SIGNATURE env var (default true) to disable verification for emergency rollbackauth_success, auth_failure_by_reason, jwks_fetch_duration, jwks_fetch_errorsdocs/howto/workbench-auth.md with configuration examples and rollback proceduresStep 4: Run tests to verify pass
Run: pytest services/workbench-bff/tests/test_auth_rbac.py -v
Expected: PASS
Step 5: Commit
1
2
git add services/workbench-bff/src/api/auth.py services/workbench-bff/src/api/jwks_cache.py services/workbench-bff/tests/test_auth_rbac.py services/workbench-bff/pyproject.toml
git commit -m "feat(auth): verify OIDC JWT signatures via JWKS"
Files:
services/workbench-bff/src/api/auth.pyapps/workbench/.env.exampleapps/workbench/src/lib/auth-factory.tsapps/workbench/e2e/auth.spec.tsStep 1: Write failing E2E test for production auth mode
1
// Ensure VITE_AUTH_PROVIDER=zitadel requires real login (no mock bypass)
Step 2: Run test to verify it fails
Run: pnpm exec playwright test apps/workbench/e2e/auth.spec.ts -g "production auth"
Expected: FAIL (mock can still be enabled).
Step 3: Implement production guardrails
AUTH_PROVIDER=oidc in production env for BFFVITE_AUTH_MOCK in production builds.env.exampleStartup Configuration Validation:
In services/workbench-bff/src/api/auth.py, add init-time validation:
1
2
3
4
5
6
7
8
9
def validate_auth_config():
"""Fail-fast validation on app startup"""
if os.getenv("ENVIRONMENT") == "production":
if os.getenv("AUTH_PROVIDER") != "oidc":
raise ValueError("AUTH_PROVIDER must be 'oidc' in production")
required_vars = ["OIDC_ISSUER_URL", "OIDC_CLIENT_ID", "OIDC_AUDIENCE"]
missing = [v for v in required_vars if not os.getenv(v)]
if missing:
raise ValueError(f"Missing required OIDC variables: {missing}")
In apps/workbench/src/lib/auth-factory.ts, add build-time check:
1
2
3
4
5
6
7
8
if (import.meta.env.MODE === 'production') {
const provider = import.meta.env.VITE_AUTH_PROVIDER;
const mockEnabled = import.meta.env.VITE_AUTH_MOCK === 'true';
if ((provider === 'zitadel' || provider === 'oidc') && mockEnabled) {
throw new Error('VITE_AUTH_MOCK cannot be enabled in production with OIDC provider');
}
}
Update apps/workbench/.env.example:
# Authentication Configuration
AUTH_PROVIDER=oidc # Required in production: 'oidc' | 'mock'
OIDC_ISSUER_URL=https://your-issuer.com # Required when AUTH_PROVIDER=oidc
OIDC_CLIENT_ID=your-client-id # Required when AUTH_PROVIDER=oidc
OIDC_AUDIENCE=your-audience # Required when AUTH_PROVIDER=oidc
VITE_AUTH_MOCK=false # Must be false in production
Create docs/howto/workbench-auth-setup.md:
Step 4: Run tests to verify pass
Run: pnpm exec playwright test apps/workbench/e2e/auth.spec.ts
Expected: PASS
Step 5: Commit
1
2
git add services/workbench-bff/src/api/auth.py apps/workbench/.env.example apps/workbench/src/lib/auth-factory.ts apps/workbench/e2e/auth.spec.ts
git commit -m "feat(auth): enforce production auth mode in UI + BFF"
Files:
services/workbench-bff/src/adapters/remediation_engine.pyservices/workbench-bff/src/api/drift_routes.pyservices/workbench-bff/tests/test_remediation_pr.pydocs/workdocs/runtime-behavior-correlation-plan.mdStep 1: Write failing test for PR creation with GitHub adapter
1
2
3
4
def test_remediation_engine_creates_pr_when_enabled():
# mock GitHub API client
# expect PR URL returned
...
Step 2: Run test to verify it fails
Run: pytest services/workbench-bff/tests/test_remediation_pr.py -v
Expected: FAIL (PR flow not verified).
Step 3: Implement GitHub adapter + config with production-grade security
Update services/workbench-bff/src/adapters/remediation_engine.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import time
import hashlib
from typing import Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class GitHubAdapter:
"""Production-ready GitHub PR creation adapter"""
def __init__(self):
# Fetch token from secrets manager (not plain env)
self.token = self._get_github_token()
self.repo = os.getenv('GITHUB_REMEDIATION_REPO')
self.base_branch = os.getenv('GITHUB_BASE_BRANCH', 'main')
if not self.repo:
raise ValueError("GITHUB_REMEDIATION_REPO must be set")
self.client = httpx.AsyncClient(
base_url='https://api.github.com',
headers={
'Authorization': f'Bearer {self.token}',
'Accept': 'application/vnd.github.v3+json',
'X-GitHub-Api-Version': '2022-11-28'
},
timeout=30.0
)
def _get_github_token(self) -> str:
"""Fetch GitHub token from secrets manager
Required permissions: repo (read/write), pull_requests (write)
"""
# TODO: Integrate with AWS Secrets Manager / Vault
# For now, check env but document this is temporary
token = os.getenv('GITHUB_TOKEN')
if not token:
raise ValueError("GITHUB_TOKEN must be set (temporary - migrate to secrets manager)")
return token
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)),
reraise=True
)
async def create_pr(
self,
title: str,
body: str,
branch: str,
files: dict[str, str]
) -> Optional[str]:
"""Create PR with exponential backoff and idempotency
Returns:
PR URL if created, None if already exists
"""
try:
# Check idempotency: search for existing PR/branch
existing_pr = await self._find_existing_pr(branch)
if existing_pr:
self._log_audit('pr_skip', {
'branch': branch,
'reason': 'already_exists',
'pr_url': existing_pr
})
return existing_pr
# Create branch and commit files
await self._create_branch_with_files(branch, files)
# Create pull request
pr_response = await self.client.post(
f'/repos/{self.repo}/pulls',
json={
'title': title,
'body': body,
'head': branch,
'base': self.base_branch
}
)
pr_response.raise_for_status()
pr_url = pr_response.json()['html_url']
self._log_audit('pr_created', {
'branch': branch,
'pr_url': pr_url,
'timestamp': time.time()
})
return pr_url
except httpx.HTTPStatusError as e:
# Translate GitHub errors into actionable responses
if e.response.status_code == 403:
self._log_audit('pr_error', {
'error': 'permission_denied',
'message': 'Check GitHub token permissions (repo, pull_requests)',
'response': e.response.text
})
raise PermissionError("GitHub token lacks required permissions")
elif e.response.status_code == 404:
self._log_audit('pr_error', {
'error': 'repo_not_found',
'repo': self.repo
})
raise ValueError(f"Repository not found: {self.repo}")
elif e.response.status_code == 429:
self._log_audit('pr_error', {
'error': 'rate_limit',
'retry_after': e.response.headers.get('Retry-After')
})
# Retry will handle this
raise
else:
self._log_audit('pr_error', {
'error': 'github_api_error',
'status': e.response.status_code,
'message': e.response.text
})
raise
except httpx.NetworkError as e:
self._log_audit('pr_error', {
'error': 'network_error',
'message': str(e)
})
# Retry will handle this
raise
async def _find_existing_pr(self, branch: str) -> Optional[str]:
"""Check if PR already exists for branch"""
try:
response = await self.client.get(
f'/repos/{self.repo}/pulls',
params={'head': f'{self.repo.split("/")[0]}:{branch}', 'state': 'open'}
)
response.raise_for_status()
prs = response.json()
if prs:
return prs[0]['html_url']
return None
except httpx.HTTPStatusError:
# If search fails, proceed with creation attempt
return None
def _log_audit(self, event_type: str, details: dict):
"""Structured audit logging for all PR operations"""
import logging
import json
logger = logging.getLogger('github_adapter')
logger.info(json.dumps({
'event': event_type,
'service': 'remediation_engine',
'timestamp': time.time(),
**details
}))
# Update RemediationEngine to return pr_url
class RemediationEngine:
def __init__(self):
self.github = GitHubAdapter()
async def create_remediation_pr(self, drift_data: dict) -> dict:
"""Create PR and return result with URL"""
try:
pr_url = await self.github.create_pr(
title=f"Auto-remediation: {drift_data['title']}",
body=self._generate_pr_body(drift_data),
branch=f"remediation/{drift_data['id']}",
files=drift_data['files']
)
return {
'status': 'success',
'pr_url': pr_url,
'pr_created': pr_url is not None
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'pr_url': None
}
Update services/workbench-bff/src/api/drift_routes.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@router.post('/drift/remediate')
async def create_remediation(drift_id: str):
"""Trigger drift remediation with PR creation"""
# ... existing drift detection logic ...
result = await remediation_engine.create_remediation_pr(drift_data)
# Ensure pr_url is included in response
return {
'drift_id': drift_id,
'remediation_status': result['status'],
'pr_url': result.get('pr_url'), # May be None if error
'pr_created': result.get('pr_created', False)
}
Add documentation to docs/howto/drift-remediation-github.md:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
## GitHub Integration Configuration
### Required Permissions
GitHub token must have:
- `repo` scope (read/write access)
- `pull_requests` scope (write access)
### Environment Variables
```env
GITHUB_REMEDIATION_REPO=org/repo # Target repository
GITHUB_BASE_BRANCH=main # Base branch for PRs
GITHUB_TOKEN=<from-secrets-manager> # Temporary - migrate to secrets manager
| Error Code | Meaning | Action |
|---|---|---|
| 403 | Permission denied | Check token permissions |
| 404 | Repository not found | Verify GITHUB_REMEDIATION_REPO |
| 422 | Validation failed | Check branch name/PR data |
| 429 | Rate limit exceeded | Automatic retry with backoff |
The adapter checks for existing PRs before creating duplicates:
All operations emit structured logs:
1
2
3
4
5
6
7
{
"event": "pr_created",
"service": "remediation_engine",
"timestamp": 1234567890,
"branch": "remediation/drift-123",
"pr_url": "https://github.com/org/repo/pull/456"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- Add config for repo, token, base branch
- Implement PR creation method in remediation engine
- Return `pr_url` in API responses
- Add exponential backoff for 403/429/network errors
- Implement idempotency check (search existing PRs)
- Add structured audit logging for all attempts
- Fetch token from secrets manager (document migration from env vars)
- Translate GitHub API errors to actionable responses
**Step 4: Run tests to verify pass**
Run: `pytest services/workbench-bff/tests/test_remediation_pr.py -v`
Expected: PASS
**Step 5: Commit**
```bash
git add services/workbench-bff/src/adapters/remediation_engine.py services/workbench-bff/src/api/drift_routes.py services/workbench-bff/tests/test_remediation_pr.py docs/workdocs/runtime-behavior-correlation-plan.md
git commit -m "feat(drift): productionize remediation PR creation"
Files:
scripts/ci/behavior_drift_gate.shscripts/ci/behavior_drift_gate.pyjust/50-ci.justtests/ci/test_behavior_drift_gate_prod.pyStep 1: Write failing test for strict mode (no fixture fallback)
1
2
def test_gate_strict_mode_fails_without_api():
...
Step 2: Run test to verify it fails
Run: pytest tests/ci/test_behavior_drift_gate_prod.py -v
Expected: FAIL
Step 3: Implement strict mode
--strict option to require API availabilityStep 4: Run tests to verify pass
Run: pytest tests/ci/test_behavior_drift_gate_prod.py -v
Expected: PASS
Step 5: Commit
1
2
git add scripts/ci/behavior_drift_gate.sh scripts/ci/behavior_drift_gate.py just/50-ci.just tests/ci/test_behavior_drift_gate_prod.py
git commit -m "feat(ci): add strict production mode for behavior drift gate"
Files:
services/knowledge-graph/src/reasoner.pyservices/knowledge-graph/src/adapters/oxigraph_adapter.pyservices/knowledge-graph/src/api/routes.pyservices/knowledge-graph/tests/test_reasoning_integration.pyStep 1: Write failing integration test for reasoning profile end-to-end
1
2
3
4
5
async def test_sparql_reasoning_profile_roundtrip():
# insert explicit graph
# query with reasoning_profile=rdfs
# expect inferred metadata + named graph
...
Step 2: Run test to verify it fails
Run: pytest services/knowledge-graph/tests/test_reasoning_integration.py -v
Expected: FAIL (if missing wiring).
Step 3: Implement/confirm wiring and snapshot metadata
Step 4: Run tests to verify pass
Run: pytest services/knowledge-graph/tests/test_reasoning_integration.py -v
Expected: PASS
Step 5: Commit
1
2
git add services/knowledge-graph/src/reasoner.py services/knowledge-graph/src/adapters/oxigraph_adapter.py services/knowledge-graph/src/api/routes.py services/knowledge-graph/tests/test_reasoning_integration.py
git commit -m "feat(kg): finalize reasoning integration + tests"
Files:
just/40-test.justdocs/workdocs/evidence-pipeline-audit.mdtests/performance/perf_utils.pytests/performance/perf_config.yamlpytest.iniStep 1: Write failing test for just test-performance recipe
1
just test-performance
Expected: FAIL (recipe missing).
Step 2: Add test-performance recipe with SLOs and thresholds
Add to just/40-test.just:
test-performance:
pytest tests/performance -v --perf-config=tests/performance/perf_config.yaml
Create tests/performance/perf_config.yaml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
slos:
knowledge_graph_query:
p50_latency_ms: 100
p95_latency_ms: 500
p99_latency_ms: 1000
min_throughput_rps: 50
max_error_rate: 0.01
drift_detection:
p50_latency_ms: 200
p95_latency_ms: 800
p99_latency_ms: 1500
min_throughput_rps: 20
max_error_rate: 0.02
workbench_api:
p50_latency_ms: 150
p95_latency_ms: 600
p99_latency_ms: 1200
min_throughput_rps: 30
max_error_rate: 0.01
scenarios:
- name: typical_load
duration_seconds: 60
concurrent_users: 10
- name: peak_load
duration_seconds: 120
concurrent_users: 50
- name: sustained_load
duration_seconds: 300
concurrent_users: 25
metrics:
- p50_latency_ms
- p95_latency_ms
- p99_latency_ms
- throughput_rps
- error_rate
- cpu_percent
- memory_mb
regression_thresholds:
p95_latency_increase_percent: 20 # Fail if p95 increases >20%
throughput_decrease_percent: 15 # Fail if throughput decreases >15%
error_rate_absolute: 0.05 # Fail if error rate >5%
baseline_storage:
enabled: true
path: tests/performance/baselines
comparison_mode: last_passed # Compare against last passing build
Update tests/performance/perf_utils.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import yaml
from pathlib import Path
from typing import Dict, Any
class PerformanceValidator:
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
def validate_slo(self, service: str, metrics: Dict[str, float]) -> bool:
"""Check if metrics meet SLO thresholds"""
slo = self.config['slos'][service]
violations = []
if metrics['p95_latency_ms'] > slo['p95_latency_ms']:
violations.append(f"P95 latency {metrics['p95_latency_ms']}ms > {slo['p95_latency_ms']}ms")
if metrics['throughput_rps'] < slo['min_throughput_rps']:
violations.append(f"Throughput {metrics['throughput_rps']} < {slo['min_throughput_rps']} rps")
if metrics['error_rate'] > slo['max_error_rate']:
violations.append(f"Error rate {metrics['error_rate']} > {slo['max_error_rate']}")
if violations:
raise AssertionError(f"SLO violations for {service}: " + "; ".join(violations))
return True
def compare_baseline(self, service: str, current: Dict[str, float]) -> bool:
"""Compare current metrics against stored baseline"""
baseline_path = Path(self.config['baseline_storage']['path']) / f"{service}.json"
if not baseline_path.exists():
# First run, store as baseline
self.store_baseline(service, current)
return True
with open(baseline_path) as f:
baseline = json.load(f)
thresholds = self.config['regression_thresholds']
# Check p95 regression
p95_increase = ((current['p95_latency_ms'] - baseline['p95_latency_ms'])
/ baseline['p95_latency_ms'] * 100)
if p95_increase > thresholds['p95_latency_increase_percent']:
raise AssertionError(f"P95 latency regression: {p95_increase:.1f}% increase")
# Check throughput regression
throughput_decrease = ((baseline['throughput_rps'] - current['throughput_rps'])
/ baseline['throughput_rps'] * 100)
if throughput_decrease > thresholds['throughput_decrease_percent']:
raise AssertionError(f"Throughput regression: {throughput_decrease:.1f}% decrease")
return True
Update docs/workdocs/evidence-pipeline-audit.md:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
## Performance Testing
### SLOs (Service Level Objectives)
| Service | P95 Latency | Throughput | Error Rate |
|---------|-------------|------------|------------|
| Knowledge Graph Query | <500ms | >50 rps | <1% |
| Drift Detection | <800ms | >20 rps | <2% |
| Workbench API | <600ms | >30 rps | <1% |
### Test Scenarios
1. **Typical Load**: 10 concurrent users, 60s duration
2. **Peak Load**: 50 concurrent users, 120s duration
3. **Sustained Load**: 25 concurrent users, 300s duration
### Metrics Tracked
- P50, P95, P99 latency
- Throughput (requests/second)
- Error rate
- CPU utilization
- Memory usage
### Regression Criteria
- **FAIL** if P95 latency increases >20% from baseline
- **FAIL** if throughput decreases >15% from baseline
- **FAIL** if error rate exceeds 5% absolute
### Running Performance Tests
```bash
just test-performance
Baseline is stored in tests/performance/baselines/ and updated on passing builds.
1
2
3
4
5
6
7
8
9
10
Add to `pytest.ini`:
```ini
[pytest]
markers =
perf: performance tests with SLO validation
addopts =
--tb=short
--perf-config=tests/performance/perf_config.yaml
Step 3: Run tests to verify pass
Run: just test-performance
Expected: PASS
Step 4: Commit
1
2
git add just/40-test.just docs/workdocs/evidence-pipeline-audit.md tests/performance/perf_utils.py tests/performance/perf_config.yaml pytest.ini
git commit -m "feat(perf): add test-performance workflow with SLOs and baselines"
Files:
tests/chaos/run_chaos.pytests/chaos/scenarios/nats_partition_mesh.pytests/chaos/scenarios/opa_restart.pytests/chaos/scenarios/openobserve_stall.pytests/chaos/scenarios/postgres_restart_outbox.pytests/chaos/probes/http_probe.pytests/chaos/probes/nats_probe.pytests/chaos/probes/db_probe.pyinfra/docker/docker-compose.chaos.ymljust/80-devservices.justdocs/workdocs/Chaos_testing_audit.mdtests/chaos/config.yamlStep 1: Write failing smoke test for chaos runner
1
2
def test_chaos_runner_loads_scenarios():
...
Step 2: Run test to verify it fails
Run: pytest tests/chaos -v
Expected: FAIL (no suite yet).
Step 3: Implement chaos runner + scenarios with safety guardrails
Create tests/chaos/config.yaml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
safety:
environment_check: true # Abort if ENVIRONMENT=production
require_explicit_enable: true # Require --enable-chaos flag
isolation:
docker_network: chaos-net
docker_profile: chaos
scenarios:
nats_partition_mesh:
hypothesis: "System recovers from NATS network partition within 30s"
steady_state:
- http_200_from: http://localhost:8000/health
- nats_messages_flowing: true
actions:
- type: network_partition
target: nats
duration_seconds: 15
validation:
- http_recovery_time_max_seconds: 30
- nats_message_redelivery: true
- no_data_loss: true
cleanup:
- restore_network
- verify_steady_state
postgres_restart_outbox:
hypothesis: "Outbox pattern preserves events during DB restart"
steady_state:
- db_connection: true
- outbox_queue_size_lt: 1000
actions:
- type: restart_container
target: postgres
graceful: false
validation:
- no_event_loss: true
- outbox_processing_resumes_within_seconds: 10
cleanup:
- verify_db_healthy
- verify_outbox_drained
opa_restart:
hypothesis: "Policy decisions fail-closed during OPA restart"
steady_state:
- opa_health: true
- policy_decisions_success_rate_gt: 0.99
actions:
- type: restart_container
target: opa
duration_seconds: 5
validation:
- decisions_during_outage: deny_all
- recovery_time_max_seconds: 10
cleanup:
- verify_opa_policies_loaded
Create tests/chaos/run_chaos.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os
import sys
import yaml
from pathlib import Path
class ChaosRunner:
def __init__(self, config_path: str = "tests/chaos/config.yaml"):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self._safety_check()
def _safety_check(self):
"""Abort if running in unsafe environment"""
if self.config['safety']['environment_check']:
env = os.getenv('ENVIRONMENT', 'development')
if env == 'production':
raise RuntimeError("CHAOS ABORT: Cannot run chaos tests in production environment")
if self.config['safety']['require_explicit_enable']:
if not os.getenv('CHAOS_ENABLED') and '--enable-chaos' not in sys.argv:
raise RuntimeError("CHAOS ABORT: Must set CHAOS_ENABLED=true or pass --enable-chaos flag")
def run_scenario(self, scenario_name: str):
"""Execute chaos scenario with automated cleanup"""
scenario = self.config['scenarios'][scenario_name]
try:
# Verify steady state
self._verify_steady_state(scenario['steady_state'])
# Execute chaos actions
for action in scenario['actions']:
self._execute_action(action)
# Run validation probes
results = self._validate(scenario['validation'])
# Emit structured results
self._emit_results(scenario_name, results)
finally:
# ALWAYS run cleanup
self._cleanup(scenario['cleanup'])
def _cleanup(self, cleanup_steps):
"""Guaranteed cleanup regardless of test outcome"""
for step in cleanup_steps:
try:
self._execute_cleanup_step(step)
except Exception as e:
print(f"CLEANUP ERROR: {step}: {e}", file=sys.stderr)
Create scenario modules with probe emission:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# tests/chaos/scenarios/nats_partition_mesh.py
from ..probes import nats_probe, http_probe
def execute(config):
# Emit structured probe results
probe_results = {
'pre_chaos': {
'http': http_probe.check_health('http://localhost:8000/health'),
'nats': nats_probe.check_stream('events')
},
'during_chaos': {},
'post_chaos': {}
}
# Apply chaos...
# Collect probe results...
return probe_results
Update infra/docker/docker-compose.chaos.yml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Isolated network for chaos testing
networks:
chaos-net:
driver: bridge
ipam:
config:
- subnet: 172.28.0.0/16
services:
nats:
profiles: [chaos]
networks:
- chaos-net
# ... other config
postgres:
profiles: [chaos]
networks:
- chaos-net
Update just/80-devservices.just:
chaos-up:
docker compose -f infra/docker/docker-compose.chaos.yml --profile chaos up -d
chaos-down:
docker compose -f infra/docker/docker-compose.chaos.yml --profile chaos down -v
run-chaos scenario:
CHAOS_ENABLED=true python tests/chaos/run_chaos.py
Update docs/workdocs/Chaos_testing_audit.md:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
## Safety Guardrails
- **Environment Check**: Aborts if `ENVIRONMENT=production`
- **Explicit Enable**: Requires `CHAOS_ENABLED=true` or `--enable-chaos` flag
- **Isolated Network**: Uses `chaos-net` Docker network (172.28.0.0/16)
- **Automatic Cleanup**: Cleanup hooks execute even if test fails
## Scenarios
### NATS Partition Mesh
**Hypothesis**: System recovers from NATS network partition within 30s
**Steady State**:
- HTTP health endpoint returns 200
- NATS messages flowing
**Chaos Action**: Partition NATS container for 15s
**Pass Criteria**:
- HTTP recovery within 30s
- NATS message redelivery successful
- No data loss
**Cleanup**:
- Restore network connectivity
- Verify steady state restored
### Postgres Restart (Outbox Pattern)
**Hypothesis**: Outbox pattern preserves events during DB restart
**Steady State**:
- Database connection healthy
- Outbox queue size <1000
**Chaos Action**: Hard restart Postgres container
**Pass Criteria**:
- No event loss
- Outbox processing resumes within 10s
**Cleanup**:
- Verify DB healthy
- Verify outbox drained
### OPA Restart
**Hypothesis**: Policy decisions fail-closed during OPA restart
**Steady State**:
- OPA health endpoint returns 200
- Policy decisions success rate >99%
**Chaos Action**: Restart OPA container for 5s
**Pass Criteria**:
- All decisions during outage are DENY
- Recovery within 10s
**Cleanup**:
- Verify OPA policies loaded
## Running Chaos Tests
```bash
# Start isolated chaos environment
just chaos-up
# Run specific scenario
just run-chaos nats_partition_mesh
# Run all scenarios
CHAOS_ENABLED=true pytest tests/chaos -v
# Cleanup
just chaos-down
Each scenario emits structured probe results:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"scenario": "nats_partition_mesh",
"hypothesis": "System recovers from NATS network partition within 30s",
"outcome": "pass",
"probe_results": {
"pre_chaos": {...},
"during_chaos": {...},
"post_chaos": {...}
},
"validation": {
"http_recovery_time_seconds": 12,
"nats_redelivery": true,
"data_loss": false
}
}
Pass: All validation criteria met Fail: One or more criteria violated Error: Scenario execution failed (check cleanup logs)
1
2
3
4
5
6
7
8
9
10
11
**Step 4: Run tests to verify pass**
Run: `pytest tests/chaos -v`
Expected: PASS
**Step 5: Commit**
```bash
git add tests/chaos infra/docker/docker-compose.chaos.yml just/80-devservices.just docs/workdocs/Chaos_testing_audit.md
git commit -m "feat(chaos): add chaos testing suite + docker profiles"
Files:
docs/runbooks/incidents/README.mddocs/runbooks/incidents/knowledge-graph.mddocs/runbooks/incidents/policy-gateway.mddocs/runbooks/incidents/workbench.mddocs/runbooks/incidents/drift-remediation.mdStep 1: Write stub runbooks with verification checklist
1
2
3
4
## Symptoms
## Immediate Mitigation
## Verification Steps
## Escalation
Step 2: Add concrete commands
Step 3: Commit
1
2
git add docs/runbooks/incidents
git commit -m "docs(runbooks): add incident response playbooks"