Event Bus
This document describes the user-focused pub/sub system that pushes real-time events from the backend to every connected client via Server-Sent Events (SSE).
Architecture Overview
- Pub/Sub broker – Each process hosts a
PubSubProtocolinstance (backend/pubsub) that accepts topic publications and fan-outs to subscribers. The default implementation is an in-memory bus; the API is built so that Redis or another broker can be swapped in later. - Per-user topics – Every authenticated user receives a namespaced topic
(
user:{user_id}:events). Additional channel topics live under the same namespace:user:{user_id}:events:{channel}. Events may be published to the base topic, one channel, or many channels simultaneously. - SSE endpoint –
GET /api/v1/user/me/eventsestablishes a long-lived HTTP stream (content typetext/event-stream). Clients may pass?channels=foo&channels=barto subscribe only to specific channel topics; omitting the query parameter falls back to the all-events topic.
┌─────────┐ publish ┌─────────┐ subscribe ┌─────────────────────┐
│Backend │────────▶│ Pub/Sub │────────────▶│ /user/me/events SSE │
└─────────┘ └─────────┘ └─────────────────────┘
Event Envelope
Every SSE block contains the standard fields (event, id, data). The JSON
payload under data: is an envelope produced by _format_sse_event:
{
"headers": {
"event": "chat-message:created"
},
"body": {
"resource": "chat-message",
"action": "created",
"data": {
"chat_id": "…",
"message_ids": ["…"],
"messages": [ {"kind": "message", … } ]
},
"metadata": {
"channels": ["chats", "messages"]
}
}
}
headersare passed through verbatim from thePubSubMessageFrame. The SSEevent:line always mirrorsheaders.event.bodyholds a structured payload. Most resource events useResourceEventPayload:resource: logical entity name (agent,memory,mcp-server, …)action: verb describing the change (created,updated,deleted, …)data: domain-specific data (often the full record or a diff)metadata: optional additional context (e.g., requested channels)
Channels and Event Catalog
| Channel | Event Name(s) | Payload Highlights |
|---|---|---|
user |
user:profile.updated |
{ user_id, fields[], profile } with the latest profile |
agents |
agent:created/updated/deleted |
Full Agent record or { id } on delete |
agent-tools |
agent-tool:created/updated/deleted |
AgentTool record or { id, agent_id } |
agent-internal-tools |
agent-internal-tool:* |
Mirrors the external tool payloads |
agent-memories |
agent-memory-namespace:* |
Links between agents and memory namespaces |
memory-namespaces |
memory-namespace:* |
Workspace-level namespaces |
memories |
memory:created/updated/deleted |
Full memory record; delete includes hard_delete boolean |
providers |
provider:* |
Provider records or { id } on delete |
embedding-indexes |
embedding-index:created/updated/deleted/refreshed |
Full index representation |
mcp-servers |
mcp-server:*, mcp-server-tools:refreshed, mcp-server-resources:refreshed |
Server record plus refresh summaries |
chats |
chat:created, chat-message:created |
Chat metadata, plus chat message batches |
messages |
chat-message:created |
Same payload as above; dedicated channel for high-volume UI |
Tip: Subscribe to narrowly-scoped channels (e.g.,
messages) when building high-frequency UIs, and fall back to the base topic if you need every event.
Consuming Events
- Establish SSE connection
const params = new URLSearchParams();
params.append("channels", "user");
params.append("channels", "chats");
const response = await fetch(`/api/v1/user/me/events?${params}`, {
headers: { Accept: "text/event-stream", Authorization: `Bearer ${token}` },
});
-
Parse blocks – Each block is separated by
\n\n. Parseevent:,id:anddata:lines, thenJSON.parsethe payload to inspectheadersandbody. -
Route by event name – The SSE
eventline (andheaders.event) follows theresource:actionconvention. Frontend code typically switches on this string and dispatches cache invalidations or optimistic updates accordingly. -
Handle unknown events gracefully – Log them during development so new event types can be adopted incrementally.
Publishing New Events
When adding a new backend mutation:
- Perform all writes inside a
transaction()block. - After obtaining the final record, create a coroutine that calls
publish_resource_event(...)(or a bespoke helper) with the correctresource,action,data, and channel list. - Register the coroutine via
add_transaction_hookso it runs only after the transaction commits successfully.
For specialized payloads, build a dedicated Pydantic model and pass it via the
body parameter of publish_user_event.
Testing Tips
- Use
curlornpx eventsourceto inspect the raw SSE stream:
curl -H "Accept: text/event-stream" \
-H "Authorization: Bearer <token>" \
"https://localhost:8000/api/v1/user/me/events?channels=chats"
-
Ensure long-running clients implement reconnect logic with exponential backoff in case nginx or a load balancer closes idle connections.
-
While the default broker is in-memory, keep the channel naming convention immutable; doing so makes it trivial to migrate to Redis- or Kafka-backed transports without touching client code.
Roadmap
- Edge devices – Because events are semantic and channel-scoped, future WebSocket, WebRTC, or MQTT shims can stream the same messages to edge agents.
- Server-to-server hooks – The pub/sub layer already supports swapping in a distributed backend. Once we adopt Redis Streams or NATS, multi-instance API servers will automatically share the same event catalog.