Skip to main content

Overview

The WebSocket stream provides real-time bidirectional communication between clients and threads. It enables:
  • Receiving message chunks as they’re generated
  • Getting complete message updates
  • Tracking token usage and costs
  • Receiving custom events from tools
  • Keeping connections alive with ping/pong

Connecting

const ws = new WebSocket(
  `wss://your-worker.workers.dev/api/threads/${threadId}/stream`
);

ws.onopen = () => {
  console.log('Connected to stream');
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  handleMessage(data);
};

ws.onerror = (error) => {
  console.error('WebSocket error:', error);
};

ws.onclose = (event) => {
  console.log('Disconnected:', event.code, event.reason);
};

Connection Parameters

threadId
string
required
The unique thread identifier to stream
userId
string
Optional user identifier for authorization and tracking

Server → Client Messages

message_data

Sent when a complete message is created or updated.
interface MessageDataEvent {
  type: 'message_data';
  message: Message;
}
{
  "type": "message_data",
  "message": {
    "id": "msg_001",
    "role": "assistant",
    "content": "Hello! How can I help you today?",
    "created_at": 1699900000000,
    "status": "completed",
    "name": "Support Agent"
  }
}
message
Message
Complete message object with all fields

message_chunk

Sent during streaming as tokens are generated. Use to display text in real-time.
interface MessageChunkEvent {
  type: 'message_chunk';
  messageId: string;
  chunk: string;
  index: number;
}
{
  "type": "message_chunk",
  "messageId": "msg_001",
  "chunk": "Hello",
  "index": 0
}
messageId
string
ID of the message being streamed
chunk
string
Text chunk to append to the message
index
number
Sequence number for ordering chunks
Buffer chunks by messageId and append them in order. The final message_data event contains the complete content.

telemetry

Sent after each LLM request with usage statistics.
interface TelemetryEvent {
  type: 'telemetry';
  model: string;
  inputTokens: number;
  outputTokens: number;
  cost: number;
  cached?: number;
}
{
  "type": "telemetry",
  "model": "gpt-4o",
  "inputTokens": 150,
  "outputTokens": 50,
  "cost": 0.00125,
  "cached": 0
}
model
string
Model used for the request
inputTokens
number
Number of input tokens consumed
outputTokens
number
Number of output tokens generated
cost
number
Estimated cost in USD
cached
number
Number of cached tokens (if applicable)

stop

Sent when a turn or conversation stops.
interface StopEvent {
  type: 'stop';
  reason: 'response' | 'tool' | 'max_turns' | 'end_conversation';
  side: 'a' | 'b';
}
{
  "type": "stop",
  "reason": "response",
  "side": "a"
}
reason
string
Why execution stopped:
  • response - AI returned text response
  • tool - Stop tool was called
  • max_turns - Turn limit reached
  • end_conversation - Conversation ended
side
string
Which side stopped: "a" or "b"

error

Sent when an error occurs during processing.
interface ErrorEvent {
  type: 'error';
  message: string;
  code?: string;
}
{
  "type": "error",
  "message": "Rate limit exceeded. Please try again later.",
  "code": "RATE_LIMIT"
}
message
string
Human-readable error description
code
string
Error code for programmatic handling:
  • RATE_LIMIT - Provider rate limit
  • AUTH_ERROR - Authentication failed
  • MODEL_ERROR - Model returned error
  • TOOL_ERROR - Tool execution failed
  • TIMEOUT - Request timed out

custom

Custom events emitted via emitThreadEvent() in tools.
interface CustomEvent {
  type: 'custom';
  event: string;
  data: unknown;
}
{
  "type": "custom",
  "event": "order_status",
  "data": {
    "orderId": "ORD-12345",
    "status": "shipped",
    "trackingNumber": "1Z999AA10123456784"
  }
}
event
string
Custom event name defined by tool
data
unknown
Event payload (any JSON-serializable data)

pong

Response to client ping for connection health.
{
  "type": "pong",
  "timestamp": 1699900000000
}
timestamp
number
Server timestamp in milliseconds

Client → Server Messages

ping

Send periodically to keep the connection alive.
ws.send(JSON.stringify({ type: 'ping' }));
Server responds with pong.
Send a ping every 30 seconds to prevent connection timeout on Cloudflare Durable Objects.

sync

Request missed messages after reconnection.
ws.send(JSON.stringify({
  type: 'sync',
  lastMessageId: 'msg_001'
}));
lastMessageId
string
required
ID of the last message received. Server sends all messages created after this.

Handling Messages

function handleMessage(data: StreamEvent) {
  switch (data.type) {
    case 'message_chunk':
      // Append chunk to message buffer
      appendChunk(data.messageId, data.chunk);
      break;

    case 'message_data':
      // Update complete message
      updateMessage(data.message);
      break;

    case 'telemetry':
      // Track usage
      trackUsage(data);
      break;

    case 'stop':
      // Handle turn/conversation end
      if (data.reason === 'end_conversation') {
        showConversationEnded();
      }
      break;

    case 'error':
      // Display error to user
      showError(data.message);
      break;

    case 'custom':
      // Handle custom events
      handleCustomEvent(data.event, data.data);
      break;

    case 'pong':
      // Connection is healthy
      updateLastPong(data.timestamp);
      break;
  }
}

Reconnection Strategy

Implement automatic reconnection for reliability:
class ThreadConnection {
  private ws: WebSocket | null = null;
  private lastMessageId: string | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  connect(threadId: string) {
    this.ws = new WebSocket(
      `wss://your-worker.workers.dev/api/threads/${threadId}/stream`
    );

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;

      // Sync missed messages
      if (this.lastMessageId) {
        this.ws?.send(JSON.stringify({
          type: 'sync',
          lastMessageId: this.lastMessageId,
        }));
      }

      // Start ping interval
      this.startPing();
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      // Track last message for reconnection
      if (data.type === 'message_data') {
        this.lastMessageId = data.message.id;
      }

      this.handleMessage(data);
    };

    this.ws.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
        this.reconnectAttempts++;
        setTimeout(() => this.connect(threadId), delay);
      }
    };
  }

  private startPing() {
    setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping' }));
      }
    }, 30000);
  }
}

React Integration

Use the @standardagents/react package for built-in WebSocket handling:
import { useThread } from '@standardagents/react';

function Chat() {
  const { messages, status, sendMessage } = useThread(threadId);

  // WebSocket connection managed automatically
  // messages update in real-time
  // status reflects connection state
}
See React Integration for full documentation.