Overview
The WebSocket stream provides real-time bidirectional communication between clients and threads. It enables:
- Receiving message chunks as they’re generated
- Getting complete message updates
- Tracking token usage and costs
- Receiving custom events from tools
- Keeping connections alive with ping/pong
Connecting
const ws = new WebSocket(
`wss://your-worker.workers.dev/api/threads/${threadId}/stream`
);
ws.onopen = () => {
console.log('Connected to stream');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
handleMessage(data);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = (event) => {
console.log('Disconnected:', event.code, event.reason);
};
Connection Parameters
The unique thread identifier to stream
Optional user identifier for authorization and tracking
Server → Client Messages
message_data
Sent when a complete message is created or updated.
interface MessageDataEvent {
type: 'message_data';
message: Message;
}
{
"type": "message_data",
"message": {
"id": "msg_001",
"role": "assistant",
"content": "Hello! How can I help you today?",
"created_at": 1699900000000,
"status": "completed",
"name": "Support Agent"
}
}
Complete message object with all fields
message_chunk
Sent during streaming as tokens are generated. Use to display text in real-time.
interface MessageChunkEvent {
type: 'message_chunk';
messageId: string;
chunk: string;
index: number;
}
{
"type": "message_chunk",
"messageId": "msg_001",
"chunk": "Hello",
"index": 0
}
ID of the message being streamed
Text chunk to append to the message
Sequence number for ordering chunks
Buffer chunks by messageId and append them in order. The final message_data event contains the complete content.
telemetry
Sent after each LLM request with usage statistics.
interface TelemetryEvent {
type: 'telemetry';
model: string;
inputTokens: number;
outputTokens: number;
cost: number;
cached?: number;
}
{
"type": "telemetry",
"model": "gpt-4o",
"inputTokens": 150,
"outputTokens": 50,
"cost": 0.00125,
"cached": 0
}
Model used for the request
Number of input tokens consumed
Number of output tokens generated
Number of cached tokens (if applicable)
stop
Sent when a turn or conversation stops.
interface StopEvent {
type: 'stop';
reason: 'response' | 'tool' | 'max_turns' | 'end_conversation';
side: 'a' | 'b';
}
{
"type": "stop",
"reason": "response",
"side": "a"
}
Why execution stopped:
response - AI returned text response
tool - Stop tool was called
max_turns - Turn limit reached
end_conversation - Conversation ended
Which side stopped: "a" or "b"
error
Sent when an error occurs during processing.
interface ErrorEvent {
type: 'error';
message: string;
code?: string;
}
{
"type": "error",
"message": "Rate limit exceeded. Please try again later.",
"code": "RATE_LIMIT"
}
Human-readable error description
Error code for programmatic handling:
RATE_LIMIT - Provider rate limit
AUTH_ERROR - Authentication failed
MODEL_ERROR - Model returned error
TOOL_ERROR - Tool execution failed
TIMEOUT - Request timed out
custom
Custom events emitted via emitThreadEvent() in tools.
interface CustomEvent {
type: 'custom';
event: string;
data: unknown;
}
{
"type": "custom",
"event": "order_status",
"data": {
"orderId": "ORD-12345",
"status": "shipped",
"trackingNumber": "1Z999AA10123456784"
}
}
Custom event name defined by tool
Event payload (any JSON-serializable data)
pong
Response to client ping for connection health.
{
"type": "pong",
"timestamp": 1699900000000
}
Server timestamp in milliseconds
Client → Server Messages
ping
Send periodically to keep the connection alive.
ws.send(JSON.stringify({ type: 'ping' }));
Server responds with pong.
Send a ping every 30 seconds to prevent connection timeout on Cloudflare Durable Objects.
sync
Request missed messages after reconnection.
ws.send(JSON.stringify({
type: 'sync',
lastMessageId: 'msg_001'
}));
ID of the last message received. Server sends all messages created after this.
Handling Messages
function handleMessage(data: StreamEvent) {
switch (data.type) {
case 'message_chunk':
// Append chunk to message buffer
appendChunk(data.messageId, data.chunk);
break;
case 'message_data':
// Update complete message
updateMessage(data.message);
break;
case 'telemetry':
// Track usage
trackUsage(data);
break;
case 'stop':
// Handle turn/conversation end
if (data.reason === 'end_conversation') {
showConversationEnded();
}
break;
case 'error':
// Display error to user
showError(data.message);
break;
case 'custom':
// Handle custom events
handleCustomEvent(data.event, data.data);
break;
case 'pong':
// Connection is healthy
updateLastPong(data.timestamp);
break;
}
}
Reconnection Strategy
Implement automatic reconnection for reliability:
class ThreadConnection {
private ws: WebSocket | null = null;
private lastMessageId: string | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
connect(threadId: string) {
this.ws = new WebSocket(
`wss://your-worker.workers.dev/api/threads/${threadId}/stream`
);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
// Sync missed messages
if (this.lastMessageId) {
this.ws?.send(JSON.stringify({
type: 'sync',
lastMessageId: this.lastMessageId,
}));
}
// Start ping interval
this.startPing();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// Track last message for reconnection
if (data.type === 'message_data') {
this.lastMessageId = data.message.id;
}
this.handleMessage(data);
};
this.ws.onclose = () => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
this.reconnectAttempts++;
setTimeout(() => this.connect(threadId), delay);
}
};
}
private startPing() {
setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
}
}
React Integration
Use the @standardagents/react package for built-in WebSocket handling:
import { useThread } from '@standardagents/react';
function Chat() {
const { messages, status, sendMessage } = useThread(threadId);
// WebSocket connection managed automatically
// messages update in real-time
// status reflects connection state
}
See React Integration for full documentation.