Command Bus
The ControlBus is the central
command queue. It accepts operator commands, routes them through the policy engine, dispatches to
targets, and records every action in a bounded audit log.
CommandTarget
Rust CommandTarget
auto-generated
/// Where a command is directed.
pub enum CommandTarget {
/// A single named entity.
Entity(EntityId),
/// All entities belonging to a named group.
Group(GroupId),
/// Every entity in the fleet.
All,
} IssuedCommand
yaml IssuedCommand auto-generated
| Field | Type | Default | Description |
|---|---|---|---|
| id* | CommandId | — | |
| issued_at* | f64 | — | Sim-time seconds when the command was issued. |
| target* | CommandTarget | — | |
| fields* | Vec<FieldAssignment> | — | |
| status* | CommandStatus | — | |
| resolved_at | f64 | — | Sim-time when the command was resolved (Acked / Failed / Timeout). |
| label* | String | — | Human-readable description shown in the audit log. |
| issued_by* | String | — | User or system that issued the command (backward-compat display string). |
| principal_id | PrincipalId | — | Typed principal identity (None during migration from legacy call sites). |
CommandStatus Lifecycle
graph LR
P["Pending"] --> A["Acked"]
P --> F["Failed"]
P -->|"30s timeout"| T["Timeout"]
COMMAND_TIMEOUT_SECS = 30 —
commands that receive no acknowledgement within this window are automatically marked as Timeout.
Audit Log
Every command produces an AuditEntry.
The log is a bounded ring buffer capped at
AUDIT_LOG_MAX = 500 entries.
Rust AuditEntry
auto-generated
/// One entry in the human-readable audit trail.
pub struct AuditEntry {
pub sim_time: f64,
pub command_id: CommandId,
pub target: String,
pub fields_summary: String,
pub issued_by: String,
/// Typed principal identity (None during migration from legacy call sites).
pub principal_id: Option<PrincipalId>,
pub status: CommandStatus,
/// Before/after field diff summary from audit capture.
pub audit_diff: Option<String>,
} Command Issuance Flow
Rust
// 1. Widget emits a BusCommandSpec
let spec = BusCommandSpec { target, fields, label };
// 2. ControlBus creates IssuedCommand (status = Pending)
let cmd = bus.issue_command(spec, "operator-jane");
// 3. PolicyEngine evaluates rules
if let Some(reason) = policies.check_command_veto(&cmd) {
bus.resolve(cmd.id, CommandStatus::Failed, Some(reason));
return;
}
// 4. Dispatch to simulator or SDK source
dispatcher.send(cmd.target, &cmd.fields).await;
// 5. Await ack or timeout (30s)
bus.await_resolution(cmd.id).await; Questions?
Reach out for help with integration, deployment, or custom domain codecs.