Command Bus

The ControlBus is the central command queue. It accepts operator commands, routes them through the policy engine, dispatches to targets, and records every action in a bounded audit log.

CommandTarget

Rust CommandTarget
auto-generated
/// Where a command is directed.
pub enum CommandTarget {
    /// A single named entity.
    Entity(EntityId),
    /// All entities belonging to a named group.
    Group(GroupId),
    /// Every entity in the fleet.
    All,
}

IssuedCommand

yaml IssuedCommand auto-generated

Field Type Default Description
id* CommandId
issued_at* f64 Sim-time seconds when the command was issued.
target* CommandTarget
fields* Vec<FieldAssignment>
status* CommandStatus
resolved_at f64 Sim-time when the command was resolved (Acked / Failed / Timeout).
label* String Human-readable description shown in the audit log.
issued_by* String User or system that issued the command (backward-compat display string).
principal_id PrincipalId Typed principal identity (None during migration from legacy call sites).

CommandStatus Lifecycle

graph LR P["Pending"] --> A["Acked"] P --> F["Failed"] P -->|"30s timeout"| T["Timeout"]

COMMAND_TIMEOUT_SECS = 30 — commands that receive no acknowledgement within this window are automatically marked as Timeout.

Audit Log

Every command produces an AuditEntry. The log is a bounded ring buffer capped at AUDIT_LOG_MAX = 500 entries.

Rust AuditEntry
auto-generated
/// One entry in the human-readable audit trail.
pub struct AuditEntry {
    pub sim_time: f64,
    pub command_id: CommandId,
    pub target: String,
    pub fields_summary: String,
    pub issued_by: String,
    /// Typed principal identity (None during migration from legacy call sites).
    pub principal_id: Option<PrincipalId>,
    pub status: CommandStatus,
    /// Before/after field diff summary from audit capture.
    pub audit_diff: Option<String>,
}

Command Issuance Flow

Rust
// 1. Widget emits a BusCommandSpec
let spec = BusCommandSpec { target, fields, label };

// 2. ControlBus creates IssuedCommand (status = Pending)
let cmd = bus.issue_command(spec, "operator-jane");

// 3. PolicyEngine evaluates rules
if let Some(reason) = policies.check_command_veto(&cmd) {
    bus.resolve(cmd.id, CommandStatus::Failed, Some(reason));
    return;
}

// 4. Dispatch to simulator or SDK source
dispatcher.send(cmd.target, &cmd.fields).await;

// 5. Await ack or timeout (30s)
bus.await_resolution(cmd.id).await;

Questions?

Reach out for help with integration, deployment, or custom domain codecs.