Skip to main content

Overview

Standard Agents provides utility functions to manipulate the execution context from within tools and hooks. All utilities are imported from @standardagents/builder:
import {
  queueTool,
  injectMessage,
  getMessages,
  reloadHistory,
  emitThreadEvent,
  forceTurn,
} from "@standardagents/builder";

queueTool

Queue a new tool call to be executed in the current flow.
queueTool(
  flow: FlowState,
  toolName: string,
  args?: Record<string, unknown>
): void
Parameters:
  • flow: The current FlowState context
  • toolName: The name of the tool to call
  • args: Optional arguments to pass to the tool
Example:
import { defineTool, queueTool } from "@standardagents/builder";
import { z } from "zod";

export default defineTool(
  "Process user order and send confirmation",
  z.object({
    orderId: z.string(),
    userId: z.string(),
  }),
  async (flow, args) => {
    // Validate the order
    const order = await validateOrder(args.orderId);

    if (!order.valid) {
      return {
        status: "error",
        error: "Order validation failed",
      };
    }

    // Queue additional tools to execute after this one
    queueTool(flow, "charge_payment", {
      orderId: args.orderId,
      amount: order.total,
    });

    queueTool(flow, "send_confirmation_email", {
      userId: args.userId,
      orderId: args.orderId,
    });

    queueTool(flow, "update_inventory", {
      items: order.items,
    });

    return {
      status: "success",
      result: `Order ${args.orderId} queued for processing`,
    };
  }
);
Sub-Prompts: When calling queueTool from within a sub-prompt, use flow.rootState instead of flow to queue tools in the root execution context.
if (flow.isChildPrompt && flow.rootState) {
  queueTool(flow.rootState, "some_tool", args);
}

injectMessage

Inject a message into the thread without triggering agent execution.
async injectMessage(
  flow: FlowState,
  options: InjectMessageOptions
): Promise<Message>
Options:
interface InjectMessageOptions {
  content: string;                    // Message content
  role: "system" | "user" | "assistant" | "tool";
  id?: string;                        // Optional custom message ID
  beforeMessageId?: string;           // Insert before this message
  toolCallId?: string;                // For role="tool" only
  name?: string;                      // Optional name field
  silent?: boolean;                   // Hidden from LLM
  forceTopLevel?: boolean;            // Force depth 0
}
Examples:
  • System Context
  • Silent Message
import { defineTool, injectMessage } from "@standardagents/builder";
import { z } from "zod";

export default defineTool(
  "Load user context",
  z.object({ userId: z.string() }),
  async (flow, args) => {
    const user = await fetchUser(args.userId);

    await injectMessage(flow, {
      role: "system",
      content: `User context loaded: ${user.name} (${user.email})
        - Account tier: ${user.tier}
        - Preferences: ${JSON.stringify(user.preferences)}`,
    });

    return {
      status: "success",
      result: "User context injected",
    };
  }
);

getMessages

Retrieve message history from the thread’s storage.
async getMessages(
  flow: FlowState,
  limit?: number,
  offset?: number,
  order?: "asc" | "desc"
): Promise<Message[]>
Parameters:
  • flow: The current FlowState context
  • limit: Maximum number of messages (default: 100)
  • offset: Number of messages to skip (default: 0)
  • order: Sort order - "asc" or "desc" (default: "asc")
Example:
import { defineTool, getMessages } from "@standardagents/builder";

export default defineTool(
  "Analyze recent conversation tone",
  async (flow) => {
    // Get last 10 messages
    const messages = await getMessages(flow, 10);

    // Filter user and assistant messages only
    const conversationMessages = messages.filter(
      (m) => m.role === "user" || m.role === "assistant"
    );

    const tone = analyzeTone(conversationMessages);

    return {
      status: "success",
      result: `Conversation tone: ${tone}`,
    };
  }
);

reloadHistory

Reload message history from storage, applying all filtering hooks.
async reloadHistory(flow: FlowState): Promise<Message[]>
Example:
import { defineTool, reloadHistory } from "@standardagents/builder";

export default defineTool(
  "Refresh conversation history",
  async (flow) => {
    const messages = await reloadHistory(flow);

    return {
      status: "success",
      result: `Reloaded ${messages.length} messages`,
    };
  }
);

emitThreadEvent

Send custom events to WebSocket clients.
emitThreadEvent(
  flow: FlowState,
  type: string,
  data: unknown
): void
Parameters:
  • flow: The current FlowState context
  • type: The event type name
  • data: The event payload data
Event Structure:
{
  "type": "event",
  "eventType": "<your-type>",
  "data": <your-data>,
  "timestamp": 1234567890
}
Examples:
  • Progress Updates
  • Status Updates
import { defineTool, emitThreadEvent } from "@standardagents/builder";
import { z } from "zod";

export default defineTool(
  "Process large dataset",
  z.object({ datasetId: z.string() }),
  async (flow, args) => {
    const dataset = await loadDataset(args.datasetId);
    const total = dataset.length;

    for (let i = 0; i < total; i++) {
      await processItem(dataset[i]);

      // Emit progress event
      emitThreadEvent(flow, "progress", {
        current: i + 1,
        total,
        percentage: ((i + 1) / total) * 100,
        message: `Processing item ${i + 1} of ${total}`,
      });
    }

    return {
      status: "success",
      result: `Processed ${total} items`,
    };
  }
);

Frontend Integration

Listen for events on the frontend using onThreadEvent from @standardagents/react:
import {
  AgentBuilderProvider,
  ThreadProvider,
  onThreadEvent,
} from "@standardagents/react";

function App() {
  return (
    <AgentBuilderProvider config={{ endpoint: "https://api.example.com" }}>
      <ThreadProvider threadId="123e4567-e89b-12d3-a456-426614174000" live={true}>
        <OrderTracker />
      </ThreadProvider>
    </AgentBuilderProvider>
  );
}

function OrderTracker() {
  // Listen for order status events
  const orderStatus = onThreadEvent<{
    orderId: string;
    status: string;
    steps?: number;
  }>("order-status");

  // Listen for progress events
  const progress = onThreadEvent<{
    current: number;
    total: number;
    percentage: number;
  }>("progress");

  return (
    <div>
      {orderStatus && (
        <div>
          Order {orderStatus.orderId}: {orderStatus.status}
        </div>
      )}

      {progress && (
        <progress value={progress.percentage} max={100} />
      )}
    </div>
  );
}

forceTurn

Force the next execution to a specific side.
forceTurn(flow: FlowState, side: "a" | "b"): void
Parameters:
  • flow: The current FlowState context
  • side: The side to force next turn to ("a" or "b")
This is primarily used for dual_ai agents to control turn order.

Complete Example

Here’s a comprehensive example showing multiple utilities working together:
import {
  defineTool,
  queueTool,
  injectMessage,
  emitThreadEvent,
  getMessages,
} from "@standardagents/builder";
import { z } from "zod";

export default defineTool(
  "Complete order processing workflow",
  z.object({
    orderId: z.string(),
    userId: z.string(),
    notifyUser: z.boolean().default(true),
  }),
  async (flow, args) => {
    // Emit initial status event
    emitThreadEvent(flow, "order-status", {
      orderId: args.orderId,
      status: "started",
    });

    // Inject system message for context
    await injectMessage(flow, {
      role: "system",
      content: `Processing order ${args.orderId} for user ${args.userId}`,
    });

    // Queue validation tool
    queueTool(flow, "validate_order", {
      orderId: args.orderId,
    });

    // Queue payment processing
    queueTool(flow, "process_payment", {
      orderId: args.orderId,
      userId: args.userId,
    });

    // Queue inventory update
    queueTool(flow, "update_inventory", {
      orderId: args.orderId,
    });

    // Conditionally queue notification
    if (args.notifyUser) {
      queueTool(flow, "send_notification", {
        userId: args.userId,
        type: "order_confirmation",
        orderId: args.orderId,
      });
    }

    // Get conversation history for audit
    const messages = await getMessages(flow, 50);
    const orderMessages = messages.filter(
      (m) => m.content?.includes(args.orderId)
    );

    // Emit completion event
    emitThreadEvent(flow, "order-status", {
      orderId: args.orderId,
      status: "queued",
      steps: 4,
      relatedMessages: orderMessages.length,
    });

    return {
      status: "success",
      result: `Order ${args.orderId} workflow initiated`,
    };
  }
);

Best Practices

Always wrap async utility calls in try-catch blocks:
try {
  await injectMessage(flow, {
    role: "system",
    content: "Context updated",
  });
} catch (error) {
  console.error("Failed to inject message:", error);
  return {
    status: "error",
    error: `Message injection failed: ${error.message}`,
  };
}
Avoid excessive event emissions that could overwhelm WebSocket clients:
// Good: Throttle emissions
for (let i = 0; i < 10000; i++) {
  if (i % 100 === 0) {
    emitThreadEvent(flow, "progress", {
      current: i,
      total: 10000,
    });
  }
}

// Avoid: Emitting inside tight loop
for (let i = 0; i < 10000; i++) {
  emitThreadEvent(flow, "progress", { value: i });
}
When retrieving large message histories, use pagination:
const pageSize = 50;
let offset = 0;
let hasMore = true;

while (hasMore) {
  const messages = await getMessages(flow, pageSize, offset);
  await processMessages(messages);
  hasMore = messages.length === pageSize;
  offset += pageSize;
}
Be mindful of tool queue depth to avoid excessive chaining:
const currentQueueDepth = flow.sequence.queue.length;

if (currentQueueDepth < 10) {
  queueTool(flow, "additional_tool", {});
} else {
  await injectMessage(flow, {
    role: "system",
    content: "Tool queue depth exceeded, deferring processing",
  });
}

Troubleshooting

Solution: Ensure you’re using ThreadProvider with live={true} and that onThreadEvent is called with the correct event type name.
Solution: Tools are queued for sequential execution. Ensure the current tool completes successfully and doesn’t stop execution.
Solution: Check that messages exist in the thread storage and that limit/offset parameters are correct.

Next Steps