Shared state management for cognitive workflows.
The ContextSnapshot is the shared state object that all agents read from and contribute to. It mirrors the βresidual streamβ concept from transformer architectures.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
interface ContextSnapshot {
// Identity
id: string; // Unique snapshot ID
workflowId: string; // Parent workflow
round: number; // Current round number
// Content
query: string; // Original user query
bindings: Map<string, ConceptId>; // Term β Concept mappings
entities: Entity[]; // Discovered entities
rules: RuleEvaluation[]; // SBVR rule evaluations
// Provenance
deltas: Delta[]; // Contribution history
timestamp: DateTime;
// Constraints
stateSize: number; // Current size in bytes
maxStateSize: number; // Size limit
}
Agents contribute deltas (changes), not full state replacements:
1
2
3
4
5
6
7
8
9
interface Delta {
agentId: string;
round: number;
weight: number; // Router-assigned weight
operation: "add" | "modify" | "remove";
path: string; // JSONPath to target
value: any; // New value
provenance: string; // Why this change
}
1
2
3
4
5
6
7
8
9
{
"agentId": "semantic-mapper",
"round": 1,
"weight": 0.35,
"operation": "add",
"path": "$.bindings.customer",
"value": "sea:customer:001",
"provenance": "Matched 'customer' to concept via KGS lookup"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Round 0: Initial ContextSnapshot β
β β’ Query extracted β
β β’ Initial bindings (empty) β
β β’ State size: minimal β
ββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Round 1: First specialist contributions β
β β’ Deltas from each specialist β
β β’ Weighted aggregation β
β β’ Validation check β
ββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Round N: Refined ContextSnapshot β
β β’ Accumulated bindings β
β β’ Complete provenance trail β
β β’ Convergence check β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Invariant | Description | Enforcement |
|---|---|---|
| Size Bounded | State cannot exceed maxStateSize |
Aggregator clips |
| Identity Stable | ConceptIds cannot change | Validator rejects |
| Monotonic Rounds | Round numbers only increase | Automatic |
| Complete Provenance | Every change has origin | Validator checks |
Default strategy combining deltas by router weights:
1
2
3
4
def weighted_merge(deltas: list[Delta]) -> ContextSnapshot:
for delta in sorted(deltas, key=lambda d: d.weight, reverse=True):
apply_delta(snapshot, delta, weight=delta.weight)
return snapshot
When specialists disagree:
| Strategy | Description |
|---|---|
majority-vote |
Most common value wins |
highest-weight |
Highest weighted agent wins |
union |
Keep all values (for arrays) |
expert-override |
Specific agent types take priority |
1
2
snapshot = get_current_snapshot(workflow_id)
customer_concept = snapshot.bindings.get("customer")
1
2
3
deltas = get_deltas_for_path(workflow_id, "$.bindings.customer")
for delta in deltas:
print(f"Round {delta.round}: {delta.agentId} β {delta.value}")
Prevent state explosion:
1
2
3
4
5
validator:
checks:
- type: "state-size"
maxBytes: 50000
onViolation: "clip"
Clipping strategies: