Capability Closure Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Complete the remaining items flagged as “not to rely on,” “cautious about,” and “missing/unclear” in docs/workdocs/capability-report.md (excluding Zed/WASM) so the platform is production-ready.

Architecture: We will close gaps across four areas: production auth, drift governance, reasoning integration, and resilience/performance (perf + chaos + runbooks). Each task adds a minimal test first, implements the smallest viable change, and adds documentation where required.

Tech Stack: FastAPI (Python), React (Vite), jose/jwks, pytest, Playwright, just, Docker Compose, Markdown.

Task 1: Production-Grade Auth Verification for Workbench BFF

Files:

Step 1: Write failing tests for JWT signature verification

1
2
3
4
5
def test_oidc_provider_rejects_invalid_signature():
    # given a JWT signed with a different key
    # when validate_token is called
    # then it raises 401 / returns False
    ...

Step 2: Run test to verify it fails

Run: pytest services/workbench-bff/tests/test_auth_rbac.py -v Expected: FAIL (currently accepts unsigned tokens).

Step 3: Implement JWKS fetch + signature verification

1
2
3
4
# services/workbench-bff/src/api/jwks_cache.py
class JwksCache:
    def get_signing_key(self, jwks_url: str, kid: str) -> dict:
        ...

Update OIDCProvider.authenticate() to:

Configuration & Resilience:

Step 4: Run tests to verify pass

Run: pytest services/workbench-bff/tests/test_auth_rbac.py -v Expected: PASS

Step 5: Commit

1
2
git add services/workbench-bff/src/api/auth.py services/workbench-bff/src/api/jwks_cache.py services/workbench-bff/tests/test_auth_rbac.py services/workbench-bff/pyproject.toml
git commit -m "feat(auth): verify OIDC JWT signatures via JWKS"

Task 2: Enforce Production Auth Mode + UI Integration

Files:

Step 1: Write failing E2E test for production auth mode

1
// Ensure VITE_AUTH_PROVIDER=zitadel requires real login (no mock bypass)

Step 2: Run test to verify it fails

Run: pnpm exec playwright test apps/workbench/e2e/auth.spec.ts -g "production auth" Expected: FAIL (mock can still be enabled).

Step 3: Implement production guardrails

Startup Configuration Validation:

In services/workbench-bff/src/api/auth.py, add init-time validation:

1
2
3
4
5
6
7
8
9
def validate_auth_config():
    """Fail-fast validation on app startup"""
    if os.getenv("ENVIRONMENT") == "production":
        if os.getenv("AUTH_PROVIDER") != "oidc":
            raise ValueError("AUTH_PROVIDER must be 'oidc' in production")
        required_vars = ["OIDC_ISSUER_URL", "OIDC_CLIENT_ID", "OIDC_AUDIENCE"]
        missing = [v for v in required_vars if not os.getenv(v)]
        if missing:
            raise ValueError(f"Missing required OIDC variables: {missing}")

In apps/workbench/src/lib/auth-factory.ts, add build-time check:

1
2
3
4
5
6
7
8
if (import.meta.env.MODE === 'production') {
  const provider = import.meta.env.VITE_AUTH_PROVIDER;
  const mockEnabled = import.meta.env.VITE_AUTH_MOCK === 'true';

  if ((provider === 'zitadel' || provider === 'oidc') && mockEnabled) {
    throw new Error('VITE_AUTH_MOCK cannot be enabled in production with OIDC provider');
  }
}

Update apps/workbench/.env.example:

# Authentication Configuration
AUTH_PROVIDER=oidc  # Required in production: 'oidc' | 'mock'
OIDC_ISSUER_URL=https://your-issuer.com  # Required when AUTH_PROVIDER=oidc
OIDC_CLIENT_ID=your-client-id            # Required when AUTH_PROVIDER=oidc
OIDC_AUDIENCE=your-audience              # Required when AUTH_PROVIDER=oidc
VITE_AUTH_MOCK=false                     # Must be false in production

Create docs/howto/workbench-auth-setup.md:

Step 4: Run tests to verify pass

Run: pnpm exec playwright test apps/workbench/e2e/auth.spec.ts Expected: PASS

Step 5: Commit

1
2
git add services/workbench-bff/src/api/auth.py apps/workbench/.env.example apps/workbench/src/lib/auth-factory.ts apps/workbench/e2e/auth.spec.ts
git commit -m "feat(auth): enforce production auth mode in UI + BFF"

Task 3: Drift Remediation PR Creation (Production-Ready)

Files:

Step 1: Write failing test for PR creation with GitHub adapter

1
2
3
4
def test_remediation_engine_creates_pr_when_enabled():
    # mock GitHub API client
    # expect PR URL returned
    ...

Step 2: Run test to verify it fails

Run: pytest services/workbench-bff/tests/test_remediation_pr.py -v Expected: FAIL (PR flow not verified).

Step 3: Implement GitHub adapter + config with production-grade security

Update services/workbench-bff/src/adapters/remediation_engine.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import time
import hashlib
from typing import Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class GitHubAdapter:
    """Production-ready GitHub PR creation adapter"""

    def __init__(self):
        # Fetch token from secrets manager (not plain env)
        self.token = self._get_github_token()
        self.repo = os.getenv('GITHUB_REMEDIATION_REPO')
        self.base_branch = os.getenv('GITHUB_BASE_BRANCH', 'main')

        if not self.repo:
            raise ValueError("GITHUB_REMEDIATION_REPO must be set")

        self.client = httpx.AsyncClient(
            base_url='https://api.github.com',
            headers={
                'Authorization': f'Bearer {self.token}',
                'Accept': 'application/vnd.github.v3+json',
                'X-GitHub-Api-Version': '2022-11-28'
            },
            timeout=30.0
        )

    def _get_github_token(self) -> str:
        """Fetch GitHub token from secrets manager

        Required permissions: repo (read/write), pull_requests (write)
        """
        # TODO: Integrate with AWS Secrets Manager / Vault
        # For now, check env but document this is temporary
        token = os.getenv('GITHUB_TOKEN')
        if not token:
            raise ValueError("GITHUB_TOKEN must be set (temporary - migrate to secrets manager)")
        return token

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)),
        reraise=True
    )
    async def create_pr(
        self,
        title: str,
        body: str,
        branch: str,
        files: dict[str, str]
    ) -> Optional[str]:
        """Create PR with exponential backoff and idempotency

        Returns:
            PR URL if created, None if already exists
        """
        try:
            # Check idempotency: search for existing PR/branch
            existing_pr = await self._find_existing_pr(branch)
            if existing_pr:
                self._log_audit('pr_skip', {
                    'branch': branch,
                    'reason': 'already_exists',
                    'pr_url': existing_pr
                })
                return existing_pr

            # Create branch and commit files
            await self._create_branch_with_files(branch, files)

            # Create pull request
            pr_response = await self.client.post(
                f'/repos/{self.repo}/pulls',
                json={
                    'title': title,
                    'body': body,
                    'head': branch,
                    'base': self.base_branch
                }
            )
            pr_response.raise_for_status()

            pr_url = pr_response.json()['html_url']

            self._log_audit('pr_created', {
                'branch': branch,
                'pr_url': pr_url,
                'timestamp': time.time()
            })

            return pr_url

        except httpx.HTTPStatusError as e:
            # Translate GitHub errors into actionable responses
            if e.response.status_code == 403:
                self._log_audit('pr_error', {
                    'error': 'permission_denied',
                    'message': 'Check GitHub token permissions (repo, pull_requests)',
                    'response': e.response.text
                })
                raise PermissionError("GitHub token lacks required permissions")

            elif e.response.status_code == 404:
                self._log_audit('pr_error', {
                    'error': 'repo_not_found',
                    'repo': self.repo
                })
                raise ValueError(f"Repository not found: {self.repo}")

            elif e.response.status_code == 429:
                self._log_audit('pr_error', {
                    'error': 'rate_limit',
                    'retry_after': e.response.headers.get('Retry-After')
                })
                # Retry will handle this
                raise

            else:
                self._log_audit('pr_error', {
                    'error': 'github_api_error',
                    'status': e.response.status_code,
                    'message': e.response.text
                })
                raise

        except httpx.NetworkError as e:
            self._log_audit('pr_error', {
                'error': 'network_error',
                'message': str(e)
            })
            # Retry will handle this
            raise

    async def _find_existing_pr(self, branch: str) -> Optional[str]:
        """Check if PR already exists for branch"""
        try:
            response = await self.client.get(
                f'/repos/{self.repo}/pulls',
                params={'head': f'{self.repo.split("/")[0]}:{branch}', 'state': 'open'}
            )
            response.raise_for_status()

            prs = response.json()
            if prs:
                return prs[0]['html_url']

            return None

        except httpx.HTTPStatusError:
            # If search fails, proceed with creation attempt
            return None

    def _log_audit(self, event_type: str, details: dict):
        """Structured audit logging for all PR operations"""
        import logging
        import json

        logger = logging.getLogger('github_adapter')
        logger.info(json.dumps({
            'event': event_type,
            'service': 'remediation_engine',
            'timestamp': time.time(),
            **details
        }))

# Update RemediationEngine to return pr_url
class RemediationEngine:
    def __init__(self):
        self.github = GitHubAdapter()

    async def create_remediation_pr(self, drift_data: dict) -> dict:
        """Create PR and return result with URL"""
        try:
            pr_url = await self.github.create_pr(
                title=f"Auto-remediation: {drift_data['title']}",
                body=self._generate_pr_body(drift_data),
                branch=f"remediation/{drift_data['id']}",
                files=drift_data['files']
            )

            return {
                'status': 'success',
                'pr_url': pr_url,
                'pr_created': pr_url is not None
            }

        except Exception as e:
            return {
                'status': 'error',
                'error': str(e),
                'pr_url': None
            }

Update services/workbench-bff/src/api/drift_routes.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@router.post('/drift/remediate')
async def create_remediation(drift_id: str):
    """Trigger drift remediation with PR creation"""
    # ... existing drift detection logic ...

    result = await remediation_engine.create_remediation_pr(drift_data)

    # Ensure pr_url is included in response
    return {
        'drift_id': drift_id,
        'remediation_status': result['status'],
        'pr_url': result.get('pr_url'),  # May be None if error
        'pr_created': result.get('pr_created', False)
    }

Add documentation to docs/howto/drift-remediation-github.md:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
## GitHub Integration Configuration

### Required Permissions

GitHub token must have:
- `repo` scope (read/write access)
- `pull_requests` scope (write access)

### Environment Variables

```env
GITHUB_REMEDIATION_REPO=org/repo  # Target repository
GITHUB_BASE_BRANCH=main           # Base branch for PRs
GITHUB_TOKEN=<from-secrets-manager>  # Temporary - migrate to secrets manager

Security Best Practices

  1. Token Storage: Migrate to AWS Secrets Manager or HashiCorp Vault
  2. Minimal Permissions: Token should have ONLY repo and PR scopes
  3. Rotation: Rotate token every 90 days
  4. Audit Logging: All PR attempts logged with structured JSON

Error Handling

Error Code Meaning Action
403 Permission denied Check token permissions
404 Repository not found Verify GITHUB_REMEDIATION_REPO
422 Validation failed Check branch name/PR data
429 Rate limit exceeded Automatic retry with backoff

Idempotency

The adapter checks for existing PRs before creating duplicates:

Audit Trail

All operations emit structured logs:

1
2
3
4
5
6
7
{
  "event": "pr_created",
  "service": "remediation_engine",
  "timestamp": 1234567890,
  "branch": "remediation/drift-123",
  "pr_url": "https://github.com/org/repo/pull/456"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- Add config for repo, token, base branch
- Implement PR creation method in remediation engine
- Return `pr_url` in API responses
- Add exponential backoff for 403/429/network errors
- Implement idempotency check (search existing PRs)
- Add structured audit logging for all attempts
- Fetch token from secrets manager (document migration from env vars)
- Translate GitHub API errors to actionable responses

**Step 4: Run tests to verify pass**

Run: `pytest services/workbench-bff/tests/test_remediation_pr.py -v`
Expected: PASS

**Step 5: Commit**

```bash
git add services/workbench-bff/src/adapters/remediation_engine.py services/workbench-bff/src/api/drift_routes.py services/workbench-bff/tests/test_remediation_pr.py docs/workdocs/runtime-behavior-correlation-plan.md
git commit -m "feat(drift): productionize remediation PR creation"

Task 4: Behavior Drift Gate – Production Mode

Files:

Step 1: Write failing test for strict mode (no fixture fallback)

1
2
def test_gate_strict_mode_fails_without_api():
    ...

Step 2: Run test to verify it fails

Run: pytest tests/ci/test_behavior_drift_gate_prod.py -v Expected: FAIL

Step 3: Implement strict mode

Step 4: Run tests to verify pass

Run: pytest tests/ci/test_behavior_drift_gate_prod.py -v Expected: PASS

Step 5: Commit

1
2
git add scripts/ci/behavior_drift_gate.sh scripts/ci/behavior_drift_gate.py just/50-ci.just tests/ci/test_behavior_drift_gate_prod.py
git commit -m "feat(ci): add strict production mode for behavior drift gate"

Task 5: Knowledge Graph Reasoning Integration (Runtime + Tests)

Files:

Step 1: Write failing integration test for reasoning profile end-to-end

1
2
3
4
5
async def test_sparql_reasoning_profile_roundtrip():
    # insert explicit graph
    # query with reasoning_profile=rdfs
    # expect inferred metadata + named graph
    ...

Step 2: Run test to verify it fails

Run: pytest services/knowledge-graph/tests/test_reasoning_integration.py -v Expected: FAIL (if missing wiring).

Step 3: Implement/confirm wiring and snapshot metadata

Step 4: Run tests to verify pass

Run: pytest services/knowledge-graph/tests/test_reasoning_integration.py -v Expected: PASS

Step 5: Commit

1
2
git add services/knowledge-graph/src/reasoner.py services/knowledge-graph/src/adapters/oxigraph_adapter.py services/knowledge-graph/src/api/routes.py services/knowledge-graph/tests/test_reasoning_integration.py
git commit -m "feat(kg): finalize reasoning integration + tests"

Task 6: Performance Test Suite (Operationalize)

Files:

Step 1: Write failing test for just test-performance recipe

1
just test-performance

Expected: FAIL (recipe missing).

Step 2: Add test-performance recipe with SLOs and thresholds

Add to just/40-test.just:

test-performance:
    pytest tests/performance -v --perf-config=tests/performance/perf_config.yaml

Create tests/performance/perf_config.yaml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
slos:
  knowledge_graph_query:
    p50_latency_ms: 100
    p95_latency_ms: 500
    p99_latency_ms: 1000
    min_throughput_rps: 50
    max_error_rate: 0.01

  drift_detection:
    p50_latency_ms: 200
    p95_latency_ms: 800
    p99_latency_ms: 1500
    min_throughput_rps: 20
    max_error_rate: 0.02

  workbench_api:
    p50_latency_ms: 150
    p95_latency_ms: 600
    p99_latency_ms: 1200
    min_throughput_rps: 30
    max_error_rate: 0.01

scenarios:
  - name: typical_load
    duration_seconds: 60
    concurrent_users: 10

  - name: peak_load
    duration_seconds: 120
    concurrent_users: 50

  - name: sustained_load
    duration_seconds: 300
    concurrent_users: 25

metrics:
  - p50_latency_ms
  - p95_latency_ms
  - p99_latency_ms
  - throughput_rps
  - error_rate
  - cpu_percent
  - memory_mb

regression_thresholds:
  p95_latency_increase_percent: 20  # Fail if p95 increases >20%
  throughput_decrease_percent: 15   # Fail if throughput decreases >15%
  error_rate_absolute: 0.05         # Fail if error rate >5%

baseline_storage:
  enabled: true
  path: tests/performance/baselines
  comparison_mode: last_passed  # Compare against last passing build

Update tests/performance/perf_utils.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import yaml
from pathlib import Path
from typing import Dict, Any

class PerformanceValidator:
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)

    def validate_slo(self, service: str, metrics: Dict[str, float]) -> bool:
        """Check if metrics meet SLO thresholds"""
        slo = self.config['slos'][service]
        violations = []

        if metrics['p95_latency_ms'] > slo['p95_latency_ms']:
            violations.append(f"P95 latency {metrics['p95_latency_ms']}ms > {slo['p95_latency_ms']}ms")

        if metrics['throughput_rps'] < slo['min_throughput_rps']:
            violations.append(f"Throughput {metrics['throughput_rps']} < {slo['min_throughput_rps']} rps")

        if metrics['error_rate'] > slo['max_error_rate']:
            violations.append(f"Error rate {metrics['error_rate']} > {slo['max_error_rate']}")

        if violations:
            raise AssertionError(f"SLO violations for {service}: " + "; ".join(violations))

        return True

    def compare_baseline(self, service: str, current: Dict[str, float]) -> bool:
        """Compare current metrics against stored baseline"""
        baseline_path = Path(self.config['baseline_storage']['path']) / f"{service}.json"

        if not baseline_path.exists():
            # First run, store as baseline
            self.store_baseline(service, current)
            return True

        with open(baseline_path) as f:
            baseline = json.load(f)

        thresholds = self.config['regression_thresholds']

        # Check p95 regression
        p95_increase = ((current['p95_latency_ms'] - baseline['p95_latency_ms'])
                        / baseline['p95_latency_ms'] * 100)
        if p95_increase > thresholds['p95_latency_increase_percent']:
            raise AssertionError(f"P95 latency regression: {p95_increase:.1f}% increase")

        # Check throughput regression
        throughput_decrease = ((baseline['throughput_rps'] - current['throughput_rps'])
                              / baseline['throughput_rps'] * 100)
        if throughput_decrease > thresholds['throughput_decrease_percent']:
            raise AssertionError(f"Throughput regression: {throughput_decrease:.1f}% decrease")

        return True

Update docs/workdocs/evidence-pipeline-audit.md:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
## Performance Testing

### SLOs (Service Level Objectives)

| Service | P95 Latency | Throughput | Error Rate |
|---------|-------------|------------|------------|
| Knowledge Graph Query | <500ms | >50 rps | <1% |
| Drift Detection | <800ms | >20 rps | <2% |
| Workbench API | <600ms | >30 rps | <1% |

### Test Scenarios

1. **Typical Load**: 10 concurrent users, 60s duration
2. **Peak Load**: 50 concurrent users, 120s duration
3. **Sustained Load**: 25 concurrent users, 300s duration

### Metrics Tracked

- P50, P95, P99 latency
- Throughput (requests/second)
- Error rate
- CPU utilization
- Memory usage

### Regression Criteria

- **FAIL** if P95 latency increases >20% from baseline
- **FAIL** if throughput decreases >15% from baseline
- **FAIL** if error rate exceeds 5% absolute

### Running Performance Tests

```bash
just test-performance

Interpreting Results

  1. Green: All metrics within SLO thresholds
  2. Yellow: Metrics degraded but within regression thresholds
  3. Red: SLO violation or regression threshold exceeded

Baseline is stored in tests/performance/baselines/ and updated on passing builds.

1
2
3
4
5
6
7
8
9
10
Add to `pytest.ini`:
```ini
[pytest]
markers =
    perf: performance tests with SLO validation

addopts =
    --tb=short
    --perf-config=tests/performance/perf_config.yaml

Step 3: Run tests to verify pass

Run: just test-performance Expected: PASS

Step 4: Commit

1
2
git add just/40-test.just docs/workdocs/evidence-pipeline-audit.md tests/performance/perf_utils.py tests/performance/perf_config.yaml pytest.ini
git commit -m "feat(perf): add test-performance workflow with SLOs and baselines"

Task 7: Chaos Testing Suite (Rebuild)

Files:

Step 1: Write failing smoke test for chaos runner

1
2
def test_chaos_runner_loads_scenarios():
    ...

Step 2: Run test to verify it fails

Run: pytest tests/chaos -v Expected: FAIL (no suite yet).

Step 3: Implement chaos runner + scenarios with safety guardrails

Create tests/chaos/config.yaml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
safety:
  environment_check: true  # Abort if ENVIRONMENT=production
  require_explicit_enable: true  # Require --enable-chaos flag
  isolation:
    docker_network: chaos-net
    docker_profile: chaos

scenarios:
  nats_partition_mesh:
    hypothesis: "System recovers from NATS network partition within 30s"
    steady_state:
      - http_200_from: http://localhost:8000/health
      - nats_messages_flowing: true
    actions:
      - type: network_partition
        target: nats
        duration_seconds: 15
    validation:
      - http_recovery_time_max_seconds: 30
      - nats_message_redelivery: true
      - no_data_loss: true
    cleanup:
      - restore_network
      - verify_steady_state

  postgres_restart_outbox:
    hypothesis: "Outbox pattern preserves events during DB restart"
    steady_state:
      - db_connection: true
      - outbox_queue_size_lt: 1000
    actions:
      - type: restart_container
        target: postgres
        graceful: false
    validation:
      - no_event_loss: true
      - outbox_processing_resumes_within_seconds: 10
    cleanup:
      - verify_db_healthy
      - verify_outbox_drained

  opa_restart:
    hypothesis: "Policy decisions fail-closed during OPA restart"
    steady_state:
      - opa_health: true
      - policy_decisions_success_rate_gt: 0.99
    actions:
      - type: restart_container
        target: opa
        duration_seconds: 5
    validation:
      - decisions_during_outage: deny_all
      - recovery_time_max_seconds: 10
    cleanup:
      - verify_opa_policies_loaded

Create tests/chaos/run_chaos.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os
import sys
import yaml
from pathlib import Path

class ChaosRunner:
    def __init__(self, config_path: str = "tests/chaos/config.yaml"):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)

        self._safety_check()

    def _safety_check(self):
        """Abort if running in unsafe environment"""
        if self.config['safety']['environment_check']:
            env = os.getenv('ENVIRONMENT', 'development')
            if env == 'production':
                raise RuntimeError("CHAOS ABORT: Cannot run chaos tests in production environment")

        if self.config['safety']['require_explicit_enable']:
            if not os.getenv('CHAOS_ENABLED') and '--enable-chaos' not in sys.argv:
                raise RuntimeError("CHAOS ABORT: Must set CHAOS_ENABLED=true or pass --enable-chaos flag")

    def run_scenario(self, scenario_name: str):
        """Execute chaos scenario with automated cleanup"""
        scenario = self.config['scenarios'][scenario_name]

        try:
            # Verify steady state
            self._verify_steady_state(scenario['steady_state'])

            # Execute chaos actions
            for action in scenario['actions']:
                self._execute_action(action)

            # Run validation probes
            results = self._validate(scenario['validation'])

            # Emit structured results
            self._emit_results(scenario_name, results)

        finally:
            # ALWAYS run cleanup
            self._cleanup(scenario['cleanup'])

    def _cleanup(self, cleanup_steps):
        """Guaranteed cleanup regardless of test outcome"""
        for step in cleanup_steps:
            try:
                self._execute_cleanup_step(step)
            except Exception as e:
                print(f"CLEANUP ERROR: {step}: {e}", file=sys.stderr)

Create scenario modules with probe emission:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# tests/chaos/scenarios/nats_partition_mesh.py
from ..probes import nats_probe, http_probe

def execute(config):
    # Emit structured probe results
    probe_results = {
        'pre_chaos': {
            'http': http_probe.check_health('http://localhost:8000/health'),
            'nats': nats_probe.check_stream('events')
        },
        'during_chaos': {},
        'post_chaos': {}
    }

    # Apply chaos...
    # Collect probe results...

    return probe_results

Update infra/docker/docker-compose.chaos.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Isolated network for chaos testing
networks:
  chaos-net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.0.0/16

services:
  nats:
    profiles: [chaos]
    networks:
      - chaos-net
    # ... other config

  postgres:
    profiles: [chaos]
    networks:
      - chaos-net

Update just/80-devservices.just:

chaos-up:
    docker compose -f infra/docker/docker-compose.chaos.yml --profile chaos up -d

chaos-down:
    docker compose -f infra/docker/docker-compose.chaos.yml --profile chaos down -v

run-chaos scenario:
    CHAOS_ENABLED=true python tests/chaos/run_chaos.py 

Update docs/workdocs/Chaos_testing_audit.md:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
## Safety Guardrails

- **Environment Check**: Aborts if `ENVIRONMENT=production`
- **Explicit Enable**: Requires `CHAOS_ENABLED=true` or `--enable-chaos` flag
- **Isolated Network**: Uses `chaos-net` Docker network (172.28.0.0/16)
- **Automatic Cleanup**: Cleanup hooks execute even if test fails

## Scenarios

### NATS Partition Mesh

**Hypothesis**: System recovers from NATS network partition within 30s

**Steady State**:
- HTTP health endpoint returns 200
- NATS messages flowing

**Chaos Action**: Partition NATS container for 15s

**Pass Criteria**:
- HTTP recovery within 30s
- NATS message redelivery successful
- No data loss

**Cleanup**:
- Restore network connectivity
- Verify steady state restored

### Postgres Restart (Outbox Pattern)

**Hypothesis**: Outbox pattern preserves events during DB restart

**Steady State**:
- Database connection healthy
- Outbox queue size <1000

**Chaos Action**: Hard restart Postgres container

**Pass Criteria**:
- No event loss
- Outbox processing resumes within 10s

**Cleanup**:
- Verify DB healthy
- Verify outbox drained

### OPA Restart

**Hypothesis**: Policy decisions fail-closed during OPA restart

**Steady State**:
- OPA health endpoint returns 200
- Policy decisions success rate >99%

**Chaos Action**: Restart OPA container for 5s

**Pass Criteria**:
- All decisions during outage are DENY
- Recovery within 10s

**Cleanup**:
- Verify OPA policies loaded

## Running Chaos Tests

```bash
# Start isolated chaos environment
just chaos-up

# Run specific scenario
just run-chaos nats_partition_mesh

# Run all scenarios
CHAOS_ENABLED=true pytest tests/chaos -v

# Cleanup
just chaos-down

Interpreting Results

Each scenario emits structured probe results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "scenario": "nats_partition_mesh",
  "hypothesis": "System recovers from NATS network partition within 30s",
  "outcome": "pass",
  "probe_results": {
    "pre_chaos": {...},
    "during_chaos": {...},
    "post_chaos": {...}
  },
  "validation": {
    "http_recovery_time_seconds": 12,
    "nats_redelivery": true,
    "data_loss": false
  }
}

Pass: All validation criteria met Fail: One or more criteria violated Error: Scenario execution failed (check cleanup logs)

1
2
3
4
5
6
7
8
9
10
11
**Step 4: Run tests to verify pass**

Run: `pytest tests/chaos -v`
Expected: PASS

**Step 5: Commit**

```bash
git add tests/chaos infra/docker/docker-compose.chaos.yml just/80-devservices.just docs/workdocs/Chaos_testing_audit.md
git commit -m "feat(chaos): add chaos testing suite + docker profiles"

Task 8: Incident Runbooks (Production Readiness)

Files:

Step 1: Write stub runbooks with verification checklist

1
2
3
4
## Symptoms
## Immediate Mitigation
## Verification Steps
## Escalation

Step 2: Add concrete commands

Step 3: Commit

1
2
git add docs/runbooks/incidents
git commit -m "docs(runbooks): add incident response playbooks"