Skip to content

Event Bus

This document describes the user-focused pub/sub system that pushes real-time events from the backend to every connected client via Server-Sent Events (SSE).

Architecture Overview

  • Pub/Sub broker – Each process hosts a PubSubProtocol instance (backend/pubsub) that accepts topic publications and fan-outs to subscribers. The default implementation is an in-memory bus; the API is built so that Redis or another broker can be swapped in later.
  • Per-user topics – Every authenticated user receives a namespaced topic (user:{user_id}:events). Additional channel topics live under the same namespace: user:{user_id}:events:{channel}. Events may be published to the base topic, one channel, or many channels simultaneously.
  • SSE endpointGET /api/v1/user/me/events establishes a long-lived HTTP stream (content type text/event-stream). Clients may pass ?channels=foo&channels=bar to subscribe only to specific channel topics; omitting the query parameter falls back to the all-events topic.
┌─────────┐ publish  ┌─────────┐  subscribe   ┌─────────────────────┐
│Backend  │────────▶│ Pub/Sub │────────────▶│ /user/me/events SSE │
└─────────┘          └─────────┘             └─────────────────────┘

Event Envelope

Every SSE block contains the standard fields (event, id, data). The JSON payload under data: is an envelope produced by _format_sse_event:

{
  "headers": {
    "event": "chat-message:created"
  },
  "body": {
    "resource": "chat-message",
    "action": "created",
    "data": {
      "chat_id": "…",
      "message_ids": ["…"],
      "messages": [ {"kind": "message",  } ]
    },
    "metadata": {
      "channels": ["chats", "messages"]
    }
  }
}
  • headers are passed through verbatim from the PubSubMessageFrame. The SSE event: line always mirrors headers.event.
  • body holds a structured payload. Most resource events use ResourceEventPayload:
  • resource: logical entity name (agent, memory, mcp-server, …)
  • action: verb describing the change (created, updated, deleted, …)
  • data: domain-specific data (often the full record or a diff)
  • metadata: optional additional context (e.g., requested channels)

Channels and Event Catalog

Channel Event Name(s) Payload Highlights
user user:profile.updated { user_id, fields[], profile } with the latest profile
agents agent:created/updated/deleted Full Agent record or { id } on delete
agent-tools agent-tool:created/updated/deleted AgentTool record or { id, agent_id }
agent-internal-tools agent-internal-tool:* Mirrors the external tool payloads
agent-memories agent-memory-namespace:* Links between agents and memory namespaces
memory-namespaces memory-namespace:* Workspace-level namespaces
memories memory:created/updated/deleted Full memory record; delete includes hard_delete boolean
providers provider:* Provider records or { id } on delete
embedding-indexes embedding-index:created/updated/deleted/refreshed Full index representation
mcp-servers mcp-server:*, mcp-server-tools:refreshed, mcp-server-resources:refreshed Server record plus refresh summaries
chats chat:created, chat-message:created Chat metadata, plus chat message batches
messages chat-message:created Same payload as above; dedicated channel for high-volume UI

Tip: Subscribe to narrowly-scoped channels (e.g., messages) when building high-frequency UIs, and fall back to the base topic if you need every event.

Consuming Events

  1. Establish SSE connection
const params = new URLSearchParams();
params.append("channels", "user");
params.append("channels", "chats");

const response = await fetch(`/api/v1/user/me/events?${params}`, {
  headers: { Accept: "text/event-stream", Authorization: `Bearer ${token}` },
});
  1. Parse blocks – Each block is separated by \n\n. Parse event:, id: and data: lines, then JSON.parse the payload to inspect headers and body.

  2. Route by event name – The SSE event line (and headers.event) follows the resource:action convention. Frontend code typically switches on this string and dispatches cache invalidations or optimistic updates accordingly.

  3. Handle unknown events gracefully – Log them during development so new event types can be adopted incrementally.

Publishing New Events

When adding a new backend mutation:

  1. Perform all writes inside a transaction() block.
  2. After obtaining the final record, create a coroutine that calls publish_resource_event(...) (or a bespoke helper) with the correct resource, action, data, and channel list.
  3. Register the coroutine via add_transaction_hook so it runs only after the transaction commits successfully.

For specialized payloads, build a dedicated Pydantic model and pass it via the body parameter of publish_user_event.

Testing Tips

  • Use curl or npx eventsource to inspect the raw SSE stream:
curl -H "Accept: text/event-stream" \
     -H "Authorization: Bearer <token>" \
     "https://localhost:8000/api/v1/user/me/events?channels=chats"
  • Ensure long-running clients implement reconnect logic with exponential backoff in case nginx or a load balancer closes idle connections.

  • While the default broker is in-memory, keep the channel naming convention immutable; doing so makes it trivial to migrate to Redis- or Kafka-backed transports without touching client code.

Roadmap

  • Edge devices – Because events are semantic and channel-scoped, future WebSocket, WebRTC, or MQTT shims can stream the same messages to edge agents.
  • Server-to-server hooks – The pub/sub layer already supports swapping in a distributed backend. Once we adopt Redis Streams or NATS, multi-instance API servers will automatically share the same event catalog.