Message Bus

The message bus provides typed pub/sub messaging between agents. Agents publish messages to topics; other agents subscribe to topic patterns and poll for messages.

Topics

Topics are dot-separated strings, e.g.:

  • tasks.new
  • results.agent-abc
  • scarab.escalation.<agent-id> (reserved for escalations)

Publishing

Via ash:

ash bus publish <agent-id> tasks.new '{"task": "summarize /workspace/report.md"}'

Via SDK (using the underlying IPC client):

#![allow(unused)]
fn main() {
agent.client.bus_publish(agent.id, "tasks.new", json!({"task": "..."})).await?;
}

Subscribing

ash bus subscribe <agent-id> "tasks.*"

Subscriptions use glob patterns. tasks.* matches tasks.new, tasks.urgent, etc.

Polling

ash bus poll <agent-id>
# Drains and prints all pending messages

Via SDK:

#![allow(unused)]
fn main() {
// Poll for escalations specifically
let escalations = agent.pending_escalations().await?;

// Poll the bus directly
match client.bus_poll(agent.id).await? {
    Response::Messages { messages } => { /* process */ }
    _ => {}
}
}

Unsubscribing

ash bus unsubscribe <agent-id> "tasks.*"

Escalation Topic

The escalation system uses the bus with reserved topics:

scarab.escalation.<target-agent-id>

When the anomaly detector or hierarchy escalation fires, it publishes a message to the parent agent's escalation topic. The Agent::pending_escalations() SDK method filters bus messages for these topics.

Capabilities Required

spec:
  capabilities:
    - tool.invoke:bus.publish   # (future: explicit bus capability)
    - tool.invoke:bus.subscribe
    - tool.invoke:bus.poll

Currently bus operations are gated by trust level and the general IPC dispatch.