From 1073b692a1a4cc049b722d75ec8834a28ef837ad Mon Sep 17 00:00:00 2001 From: Vj Date: Thu, 29 Jan 2026 14:55:43 +0000 Subject: [PATCH] feat: implement orchestrator layer with OpenCode integration --- docs/orchestrator-spec.md | 371 +++++++++++++ src/agents/orchestrator/config.ts | 34 ++ .../delegation-tools/embedded-agent.ts | 59 ++ .../orchestrator/delegation-tools/index.ts | 3 + .../delegation-tools/opencode-agent.ts | 41 ++ .../delegation-tools/research-agent.ts | 56 ++ .../orchestrator/delegation-tools/tools.ts | 99 ++++ src/agents/orchestrator/index.ts | 6 + src/agents/orchestrator/orchestrator.test.ts | 68 +++ src/agents/orchestrator/orchestrator.ts | 108 ++++ src/agents/orchestrator/prompt.ts | 64 +++ src/agents/orchestrator/result-aggregator.ts | 31 ++ src/agents/orchestrator/types.ts | 46 ++ src/agents/pi-embedded-runner/run.ts | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 2 +- src/agents/pi-embedded-runner/run/params.ts | 2 + src/agents/pi-embedded-runner/run/types.ts | 2 + .../reply/agent-runner-execution.ts | 519 +++++++++--------- src/auto-reply/types.ts | 1 + src/cli/program/register.agent.ts | 2 + src/commands/agent-via-gateway.ts | 4 + src/commands/agent.ts | 166 +++--- src/commands/agent/types.ts | 2 + src/commands/configure.orchestrator.ts | 52 ++ src/commands/configure.shared.ts | 2 + src/commands/configure.wizard.ts | 9 +- src/config/types.moltbot.ts | 2 + src/config/types.orchestrator.ts | 31 ++ src/config/types.ts | 1 + src/config/zod-schema.orchestrator.ts | 48 ++ src/config/zod-schema.ts | 2 + src/cron/isolated-agent/run.ts | 103 ++-- src/gateway/protocol/schema/agent.ts | 2 + src/gateway/server-methods/agent.ts | 4 + 34 files changed, 1588 insertions(+), 355 deletions(-) create mode 100644 docs/orchestrator-spec.md create mode 100644 src/agents/orchestrator/config.ts create mode 100644 src/agents/orchestrator/delegation-tools/embedded-agent.ts create mode 100644 src/agents/orchestrator/delegation-tools/index.ts create mode 100644 src/agents/orchestrator/delegation-tools/opencode-agent.ts create mode 100644 src/agents/orchestrator/delegation-tools/research-agent.ts create mode 100644 src/agents/orchestrator/delegation-tools/tools.ts create mode 100644 src/agents/orchestrator/index.ts create mode 100644 src/agents/orchestrator/orchestrator.test.ts create mode 100644 src/agents/orchestrator/orchestrator.ts create mode 100644 src/agents/orchestrator/prompt.ts create mode 100644 src/agents/orchestrator/result-aggregator.ts create mode 100644 src/agents/orchestrator/types.ts create mode 100644 src/commands/configure.orchestrator.ts create mode 100644 src/config/types.orchestrator.ts create mode 100644 src/config/zod-schema.orchestrator.ts diff --git a/docs/orchestrator-spec.md b/docs/orchestrator-spec.md new file mode 100644 index 000000000..4e7cdcdd3 --- /dev/null +++ b/docs/orchestrator-spec.md @@ -0,0 +1,371 @@ +# Moltbot Orchestrator Feature Specification + +## Overview + +This document specifies the implementation of an **Orchestrator Layer** for Moltbot. The orchestrator acts as an intelligent router that intercepts user messages and decides whether to handle them directly or delegate to specialized agents. + +## Goals + +1. **Smart Routing**: Use a fast LLM (configurable) to analyze incoming messages and route to appropriate agents +2. **OpenCode Integration**: Enable Moltbot to delegate coding tasks to OpenCode CLI +3. **Parallel Execution**: Support running multiple agents concurrently +4. **Configurable Models**: All agent models should be configurable via `moltbot.json` +5. **Extensible**: Easy to add new specialized agents + +## Architecture + +``` + User Message + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ ORCHESTRATOR (configurable model) │ +│ - Default: google-gemini-cli/gemini-3-flash-preview │ +│ - System prompt: knows available agents │ +│ - Delegation tools: spawn specialized agents │ +│ - Decision: handle directly OR delegate │ +└────────┬─────────────┬──────────────┬──────────────────────┘ + │ │ │ + ▼ ▼ ▼ +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ OpenCode │ │ Embedded Pi │ │ Research │ +│ Agent │ │ Agent │ │ Agent │ +│ (CLI/HTTP) │ │ (existing) │ │ (web tools) │ +└─────────────┘ └─────────────┘ └─────────────┘ + │ │ │ + └─────────────┼──────────────┘ + ▼ + Result Aggregation +``` + +## Configuration Schema + +Add to `moltbot.json`: + +```json5 +{ + "orchestrator": { + "enabled": true, + "model": "google-gemini-cli/gemini-3-flash-preview", + "fallbacks": ["openai-codex/gpt-5.2-codex"], + "agents": { + "opencode": { + "enabled": true, + "model": "anthropic/claude-sonnet-4", + "mode": "cli", // "cli" | "serve" + "binary": "opencode", + "timeoutMs": 300000 + }, + "research": { + "enabled": true, + "model": "google-gemini-cli/gemini-3-flash-preview" + }, + "embedded": { + "enabled": true, + "model": null // null = use agents.defaults.model + } + }, + "parallelExecution": true, + "maxParallelAgents": 3 + } +} +``` + +## File Structure + +``` +src/agents/orchestrator/ +├── index.ts # Main exports +├── types.ts # Type definitions +├── config.ts # Config schema and defaults +├── orchestrator.ts # Main orchestrateUserMessage() +├── prompt.ts # System prompt builder +├── result-aggregator.ts # Combine agent results +└── delegation-tools/ + ├── index.ts # Tool exports + ├── opencode-agent.ts # OpenCode CLI/HTTP integration + ├── research-agent.ts # Web search delegation + └── embedded-agent.ts # Existing Pi agent delegation +``` + +## Implementation Checklist + +### Phase 1: Core Infrastructure +- [ ] Create feature branch `feature/orchestrator` +- [ ] Create `src/agents/orchestrator/` directory structure +- [ ] Implement `types.ts` - AgentResult, OrchestratorParams, OrchestratorResponse +- [ ] Implement `config.ts` - Config schema with Zod/TypeBox validation +- [ ] Add orchestrator config types to `src/config/types.ts` + +### Phase 2: Orchestrator Core +- [ ] Implement `prompt.ts` - Build orchestrator system prompt +- [ ] Implement `orchestrator.ts` - Main orchestration logic +- [ ] Implement `result-aggregator.ts` - Combine multiple agent outputs + +### Phase 3: Delegation Tools +- [ ] Implement `delegation-tools/opencode-agent.ts` - OpenCode CLI integration +- [ ] Implement `delegation-tools/research-agent.ts` - Web search delegation +- [ ] Implement `delegation-tools/embedded-agent.ts` - Existing agent delegation +- [ ] Implement `delegation-tools/index.ts` - Export all tools + +### Phase 4: Integration +- [ ] Add orchestrator to tool registry in `moltbot-tools.ts` +- [ ] Integrate into message handling pipeline +- [ ] Add CLI flag for enabling/disabling orchestrator +- [ ] Update config migration if needed + +### Phase 5: Testing & Documentation +- [ ] Add unit tests for orchestrator logic +- [ ] Add integration tests for delegation tools +- [ ] Add E2E test for full orchestration flow +- [ ] Update documentation + +### Phase 6: Polish +- [ ] Run linter and fix issues +- [ ] Run full test suite +- [ ] Create PR with changelog entry + +## Type Definitions + +```typescript +// types.ts + +export interface AgentResult { + agentName: string; + agentType: "opencode" | "research" | "embedded" | "orchestrator"; + status: "success" | "error" | "partial" | "timeout"; + output: string; + artifacts?: unknown[]; + errorMessage?: string; + durationMs: number; + model?: string; + tokenUsage?: { + input: number; + output: number; + }; +} + +export interface OrchestratorParams { + userMessage: string; + sessionId: string; + sessionKey?: string; + config?: MoltbotConfig; + images?: ImageContent[]; + timeoutMs?: number; + thinking?: ThinkLevel; +} + +export interface OrchestratorResponse { + payloads: Array<{ text: string; isError?: boolean }>; + agentResults: AgentResult[]; + meta: { + durationMs: number; + orchestratorModel: string; + delegatedAgents: string[]; + }; +} + +export interface OrchestratorConfig { + enabled: boolean; + model: string; + fallbacks?: string[]; + agents: { + opencode?: OpenCodeAgentConfig; + research?: ResearchAgentConfig; + embedded?: EmbeddedAgentConfig; + }; + parallelExecution?: boolean; + maxParallelAgents?: number; +} + +export interface OpenCodeAgentConfig { + enabled: boolean; + model?: string; + mode: "cli" | "serve"; + binary?: string; + timeoutMs?: number; + servePort?: number; +} + +export interface ResearchAgentConfig { + enabled: boolean; + model?: string; +} + +export interface EmbeddedAgentConfig { + enabled: boolean; + model?: string | null; +} +``` + +## OpenCode Integration Details + +### CLI Mode (`opencode run`) + +```typescript +// Simple one-shot execution +async function runOpenCodeCli(message: string, options: OpenCodeAgentConfig): Promise { + const args = ["run", message]; + if (options.model) { + args.push("--model", options.model); + } + + const result = await spawnCommand(options.binary ?? "opencode", args, { + timeoutMs: options.timeoutMs ?? 300000, + cwd: workspaceDir, + }); + + return { + agentName: "OpenCode", + agentType: "opencode", + status: result.exitCode === 0 ? "success" : "error", + output: result.stdout, + errorMessage: result.stderr || undefined, + durationMs: result.durationMs, + model: options.model, + }; +} +``` + +### Serve Mode (Future) + +```typescript +// Persistent HTTP server for sessions +interface OpenCodeServer { + port: number; + sessionId: string; + prompt(message: string): Promise; + close(): Promise; +} + +async function getOrCreateOpenCodeServer(sessionId: string): Promise { + // Check if server exists for this session + // If not, spawn `opencode serve --port ` + // Return client connected to the server +} +``` + +## Orchestrator System Prompt + +```typescript +// prompt.ts + +export function buildOrchestratorSystemPrompt(config: OrchestratorConfig): string { + const agents = []; + + if (config.agents.opencode?.enabled) { + agents.push(` +**OpenCode Agent** (tool: delegate_to_opencode) +- Use for: coding tasks, file modifications, debugging, shell commands, git operations +- Capabilities: Full IDE-like agent with bash, file editing, LSP, code search +- When to use: User asks to write code, modify files, run commands, debug issues`); + } + + if (config.agents.research?.enabled) { + agents.push(` +**Research Agent** (tool: delegate_to_research) +- Use for: web searches, information retrieval, fact-checking +- Capabilities: Web search, URL fetching, documentation lookup +- When to use: User needs current information, documentation, or research`); + } + + if (config.agents.embedded?.enabled) { + agents.push(` +**Moltbot Agent** (tool: delegate_to_moltbot) +- Use for: general queries, Moltbot-specific features, messaging, scheduling +- Capabilities: All Moltbot tools (message, cron, browser, canvas, etc.) +- When to use: User asks about Moltbot, messaging, or general assistance`); + } + + return `You are Moltbot's Orchestrator. Your role is to understand user requests and route them to the most appropriate specialized agent(s). + +## Available Agents +${agents.join("\n")} + +## Decision Rules + +1. **Analyze the request**: Understand what the user is asking for +2. **Select agent(s)**: Choose the best agent(s) for the task +3. **Parallel execution**: You can call multiple agents simultaneously if the task benefits from it +4. **Direct response**: For simple greetings or trivial questions, respond directly without delegation + +## Response Format + +- If delegating: Call the appropriate delegation tool(s) +- If handling directly: Provide a brief, helpful response +- If multiple agents needed: Call multiple tools - they will run in parallel + +## Examples + +User: "Write a Python script to parse JSON" +Action: delegate_to_opencode + +User: "What's the weather in Tokyo?" +Action: delegate_to_research + +User: "Send a message to John saying hello" +Action: delegate_to_moltbot + +User: "Research React best practices and create a component" +Action: delegate_to_research AND delegate_to_opencode (parallel) + +User: "Hello!" +Action: Respond directly with a greeting`; +} +``` + +## Result Aggregation + +```typescript +// result-aggregator.ts + +export function aggregateResults(results: AgentResult[]): OrchestratorResponse { + const payloads: Array<{ text: string; isError?: boolean }> = []; + + for (const result of results) { + if (result.status === "error") { + payloads.push({ + text: `**${result.agentName} Error:**\n${result.errorMessage || result.output}`, + isError: true, + }); + } else { + payloads.push({ + text: results.length > 1 + ? `**${result.agentName}:**\n${result.output}` + : result.output, + }); + } + } + + return { + payloads, + agentResults: results, + meta: { + durationMs: Math.max(...results.map(r => r.durationMs)), + orchestratorModel: "configured-model", + delegatedAgents: results.map(r => r.agentType), + }, + }; +} +``` + +## Migration Notes + +- Orchestrator is **opt-in** via `orchestrator.enabled: true` +- Default behavior (orchestrator disabled) remains unchanged +- Existing `agents.defaults.model` config is preserved +- Orchestrator model is separate from agent models + +## Future Enhancements + +1. **OpenCode serve mode**: Persistent coding sessions with state +2. **Agent chaining**: Sequential agent execution with context passing +3. **Custom agents**: Plugin system for adding new agent types +4. **Learning**: Track which delegations work best and improve routing +5. **Streaming**: Real-time output from delegated agents + +## References + +- OpenCode CLI: https://github.com/anomalyco/opencode +- Moltbot Agent Architecture: `src/agents/pi-embedded-runner/` +- Existing Subagent Pattern: `src/agents/tools/sessions-spawn-tool.ts` diff --git a/src/agents/orchestrator/config.ts b/src/agents/orchestrator/config.ts new file mode 100644 index 000000000..c619b958b --- /dev/null +++ b/src/agents/orchestrator/config.ts @@ -0,0 +1,34 @@ +import type { MoltbotConfig } from "../../config/config.js"; +import type { OrchestratorConfig } from "../../config/types.orchestrator.js"; + +export const ORCHESTRATOR_DEFAULT_MODEL = "google-gemini-cli/gemini-3-flash-preview"; + +export function resolveOrchestratorConfig(config?: MoltbotConfig): OrchestratorConfig { + const orchestrator = config?.orchestrator ?? {}; + + return { + enabled: orchestrator.enabled ?? false, + model: orchestrator.model ?? ORCHESTRATOR_DEFAULT_MODEL, + fallbacks: orchestrator.fallbacks ?? [], + agents: { + opencode: { + enabled: orchestrator.agents?.opencode?.enabled ?? true, + model: orchestrator.agents?.opencode?.model, + mode: orchestrator.agents?.opencode?.mode ?? "cli", + binary: orchestrator.agents?.opencode?.binary ?? "opencode", + timeoutMs: orchestrator.agents?.opencode?.timeoutMs ?? 300000, + servePort: orchestrator.agents?.opencode?.servePort ?? 4096, + }, + research: { + enabled: orchestrator.agents?.research?.enabled ?? true, + model: orchestrator.agents?.research?.model, + }, + embedded: { + enabled: orchestrator.agents?.embedded?.enabled ?? true, + model: orchestrator.agents?.embedded?.model, + }, + }, + parallelExecution: orchestrator.parallelExecution ?? true, + maxParallelAgents: orchestrator.maxParallelAgents ?? 3, + }; +} diff --git a/src/agents/orchestrator/delegation-tools/embedded-agent.ts b/src/agents/orchestrator/delegation-tools/embedded-agent.ts new file mode 100644 index 000000000..336cc2b68 --- /dev/null +++ b/src/agents/orchestrator/delegation-tools/embedded-agent.ts @@ -0,0 +1,59 @@ +import { runEmbeddedPiAgent } from "../../pi-embedded-runner/run.js"; +import type { AgentResult, EmbeddedAgentConfig } from "../types.js"; +import type { MoltbotConfig } from "../../../config/config.js"; +import { resolveSessionTranscriptPath } from "../../../config/sessions/paths.js"; + +export async function delegateToEmbeddedAgent( + message: string, + options: EmbeddedAgentConfig, + config?: MoltbotConfig, + sessionId?: string, + sessionKey?: string, +): Promise { + const started = Date.now(); + const effectiveSessionId = sessionId ?? `orchestrator-sub-${Date.now()}`; + + try { + const result = await runEmbeddedPiAgent({ + sessionId: effectiveSessionId, + sessionKey, + sessionFile: resolveSessionTranscriptPath(effectiveSessionId), + workspaceDir: config?.agents?.defaults?.workspace ?? process.cwd(), + config, + prompt: message, + timeoutMs: 300000, + runId: `orchestrator-${Date.now()}`, + provider: options.model ? options.model.split("/")[0] : undefined, + model: options.model ? options.model.split("/").slice(1).join("/") : undefined, + disableTools: false, // Allow embedded agent to use its tools + }); + + const output = (result.payloads ?? []) + .filter((p): p is { text: string } => !p.isError && typeof p.text === "string") + .map((p) => p.text) + .join("\n") + .trim(); + + const errorPayload = (result.payloads ?? []).find((p) => p.isError); + + return { + agentName: "Moltbot", + agentType: "embedded", + status: errorPayload ? "error" : "success", + output, + errorMessage: errorPayload?.text, + durationMs: Date.now() - started, + model: options.model ?? undefined, + }; + } catch (error: any) { + return { + agentName: "Moltbot", + agentType: "embedded", + status: "error", + output: "", + errorMessage: error.message || "Failed to execute embedded agent", + durationMs: Date.now() - started, + model: options.model ?? undefined, + }; + } +} diff --git a/src/agents/orchestrator/delegation-tools/index.ts b/src/agents/orchestrator/delegation-tools/index.ts new file mode 100644 index 000000000..10288ed1a --- /dev/null +++ b/src/agents/orchestrator/delegation-tools/index.ts @@ -0,0 +1,3 @@ +export { delegateToOpenCodeAgent } from "./opencode-agent.js"; +export { delegateToResearchAgent } from "./research-agent.js"; +export { delegateToEmbeddedAgent } from "./embedded-agent.js"; diff --git a/src/agents/orchestrator/delegation-tools/opencode-agent.ts b/src/agents/orchestrator/delegation-tools/opencode-agent.ts new file mode 100644 index 000000000..97bcfe983 --- /dev/null +++ b/src/agents/orchestrator/delegation-tools/opencode-agent.ts @@ -0,0 +1,41 @@ +import { runCommandWithTimeout } from "../../../process/exec.js"; +import type { AgentResult, OpenCodeAgentConfig } from "../types.js"; + +export async function delegateToOpenCodeAgent( + message: string, + options: OpenCodeAgentConfig, + workspaceDir: string, +): Promise { + const started = Date.now(); + const args = ["run", message]; + if (options.model) { + args.push("--model", options.model); + } + + try { + const result = await runCommandWithTimeout([options.binary ?? "opencode", ...args], { + timeoutMs: options.timeoutMs ?? 300000, + cwd: workspaceDir, + }); + + return { + agentName: "OpenCode", + agentType: "opencode", + status: result.code === 0 ? "success" : "error", + output: result.stdout.trim(), + errorMessage: result.code !== 0 ? result.stderr.trim() || "OpenCode failed." : undefined, + durationMs: Date.now() - started, + model: options.model, + }; + } catch (error: any) { + return { + agentName: "OpenCode", + agentType: "opencode", + status: "error", + output: "", + errorMessage: error.message || "Failed to execute OpenCode CLI", + durationMs: Date.now() - started, + model: options.model, + }; + } +} diff --git a/src/agents/orchestrator/delegation-tools/research-agent.ts b/src/agents/orchestrator/delegation-tools/research-agent.ts new file mode 100644 index 000000000..7acd9650a --- /dev/null +++ b/src/agents/orchestrator/delegation-tools/research-agent.ts @@ -0,0 +1,56 @@ +import { createWebSearchTool } from "../../tools/web-search.js"; +import type { AgentResult, ResearchAgentConfig } from "../types.js"; +import type { MoltbotConfig } from "../../../config/config.js"; + +export async function delegateToResearchAgent( + query: string, + options: ResearchAgentConfig, + config?: MoltbotConfig, +): Promise { + const started = Date.now(); + + try { + const searchTool = createWebSearchTool({ config }); + if (!searchTool) { + throw new Error("Web search tool not available or not configured."); + } + + const result = await searchTool.execute("orchestrator-research", { query }); + const firstContent = result.content[0]; + if (!firstContent || firstContent.type !== "text") { + throw new Error("Research tool returned unexpected content type."); + } + const data = JSON.parse(firstContent.text); + + let output = ""; + if (data.results) { + output = data.results + .map((r: any) => `**${r.title}**\n${r.url}\n${r.description}`) + .join("\n\n"); + } else if (data.content) { + output = data.content; + } else { + output = JSON.stringify(data, null, 2); + } + + return { + agentName: "Research", + agentType: "research", + status: data.error ? "error" : "success", + output, + errorMessage: data.error ? data.message || data.error : undefined, + durationMs: Date.now() - started, + model: options.model, + }; + } catch (error: any) { + return { + agentName: "Research", + agentType: "research", + status: "error", + output: "", + errorMessage: error.message || "Failed to execute research", + durationMs: Date.now() - started, + model: options.model, + }; + } +} diff --git a/src/agents/orchestrator/delegation-tools/tools.ts b/src/agents/orchestrator/delegation-tools/tools.ts new file mode 100644 index 000000000..903767923 --- /dev/null +++ b/src/agents/orchestrator/delegation-tools/tools.ts @@ -0,0 +1,99 @@ +import { Type } from "@sinclair/typebox"; +import type { AnyAgentTool } from "../../tools/common.js"; +import { + delegateToOpenCodeAgent, + delegateToResearchAgent, + delegateToEmbeddedAgent, +} from "./index.js"; +import type { OrchestratorConfig } from "../types.js"; +import type { MoltbotConfig } from "../../../config/config.js"; + +export function createDelegationTools(params: { + config: MoltbotConfig; + orchestratorConfig: OrchestratorConfig; + sessionId: string; + sessionKey?: string; + onAgentResult: (result: any) => void; +}): AnyAgentTool[] { + const { config, orchestratorConfig, sessionId, sessionKey, onAgentResult } = params; + const tools: AnyAgentTool[] = []; + + if (orchestratorConfig.agents?.opencode?.enabled !== false) { + tools.push({ + name: "delegate_to_opencode", + label: "OpenCode", + description: "Delegate a coding, debugging, or file modification task to the OpenCode agent.", + parameters: Type.Object({ + task: Type.String({ description: "The specific coding task to perform." }), + }), + execute: async (_id, args) => { + const result = await delegateToOpenCodeAgent( + String(args.task), + orchestratorConfig.agents!.opencode!, + config.agents?.defaults?.workspace ?? process.cwd(), + ); + onAgentResult(result); + return { + content: [ + { type: "text", text: result.output || result.errorMessage || "Task completed." }, + ], + details: result, + }; + }, + }); + } + + if (orchestratorConfig.agents?.research?.enabled !== false) { + tools.push({ + name: "delegate_to_research", + label: "Research", + description: "Delegate an information retrieval or web search task to the Research agent.", + parameters: Type.Object({ + query: Type.String({ description: "The search query or research topic." }), + }), + execute: async (_id, args) => { + const result = await delegateToResearchAgent( + String(args.query), + orchestratorConfig.agents!.research!, + config, + ); + onAgentResult(result); + return { + content: [ + { type: "text", text: result.output || result.errorMessage || "Research completed." }, + ], + details: result, + }; + }, + }); + } + + if (orchestratorConfig.agents?.embedded?.enabled !== false) { + tools.push({ + name: "delegate_to_moltbot", + label: "Moltbot", + description: "Delegate a general query or Moltbot-specific task to the main Moltbot agent.", + parameters: Type.Object({ + message: Type.String({ description: "The message or task for the Moltbot agent." }), + }), + execute: async (_id, args) => { + const result = await delegateToEmbeddedAgent( + String(args.message), + orchestratorConfig.agents!.embedded!, + config, + sessionId, + sessionKey, + ); + onAgentResult(result); + return { + content: [ + { type: "text", text: result.output || result.errorMessage || "Task completed." }, + ], + details: result, + }; + }, + }); + } + + return tools; +} diff --git a/src/agents/orchestrator/index.ts b/src/agents/orchestrator/index.ts new file mode 100644 index 000000000..6667a4e9f --- /dev/null +++ b/src/agents/orchestrator/index.ts @@ -0,0 +1,6 @@ +export * from "./types.js"; +export * from "./config.js"; +export * from "./orchestrator.js"; +export * from "./prompt.js"; +export * from "./result-aggregator.js"; +export * from "./delegation-tools/index.js"; diff --git a/src/agents/orchestrator/orchestrator.test.ts b/src/agents/orchestrator/orchestrator.test.ts new file mode 100644 index 000000000..2ca6f3a25 --- /dev/null +++ b/src/agents/orchestrator/orchestrator.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it, vi } from "vitest"; +import { orchestrateUserMessage } from "./orchestrator.js"; +import { ORCHESTRATOR_DEFAULT_MODEL } from "./config.js"; + +vi.mock("../pi-embedded-runner/run.js", () => ({ + runEmbeddedPiAgent: vi.fn(async (params) => { + if (params.sessionId.startsWith("orch-")) { + return { + payloads: [{ text: "I'll help you with that." }], + meta: { + durationMs: 100, + agentMeta: { + sessionId: params.sessionId, + provider: params.provider, + model: params.model, + }, + }, + }; + } + return { + payloads: [{ text: "Sub-agent result" }], + meta: { + durationMs: 200, + agentMeta: { + sessionId: params.sessionId, + provider: params.provider, + model: params.model, + }, + }, + }; + }), +})); + +describe("Orchestrator", () => { + it("should handle direct response when no tools called", async () => { + const result = await orchestrateUserMessage({ + userMessage: "hello", + sessionId: "test-session", + config: { + orchestrator: { + enabled: true, + model: ORCHESTRATOR_DEFAULT_MODEL, + }, + } as any, + }); + + expect(result.agentResults).toHaveLength(1); + expect(result.agentResults[0].agentName).toBe("Orchestrator"); + expect(result.agentResults[0].output).toBe("I'll help you with that."); + }); + + it("should fall back to direct embedded run if disabled", async () => { + const result = await orchestrateUserMessage({ + userMessage: "hello", + sessionId: "test-session", + config: { + orchestrator: { + enabled: false, + }, + } as any, + }); + + expect(result.agentResults).toHaveLength(1); + expect(result.agentResults[0].agentName).toBe("Moltbot"); + expect(result.agentResults[0].agentType).toBe("embedded"); + expect(result.agentResults[0].output).toBe("Sub-agent result"); + }); +}); diff --git a/src/agents/orchestrator/orchestrator.ts b/src/agents/orchestrator/orchestrator.ts new file mode 100644 index 000000000..fc09c708d --- /dev/null +++ b/src/agents/orchestrator/orchestrator.ts @@ -0,0 +1,108 @@ +import { runEmbeddedPiAgent } from "../pi-embedded-runner/run.js"; +import { resolveOrchestratorConfig } from "./config.js"; +import { buildOrchestratorSystemPrompt } from "./prompt.js"; +import { createDelegationTools } from "./delegation-tools/tools.js"; +import { aggregateResults } from "./result-aggregator.js"; +import type { OrchestratorParams, OrchestratorResponse, AgentResult } from "./types.js"; +import { resolveSessionTranscriptPath } from "../../config/sessions/paths.js"; + +export async function orchestrateUserMessage( + params: OrchestratorParams, +): Promise { + const started = Date.now(); + const orchestratorConfig = resolveOrchestratorConfig(params.config); + + if (!orchestratorConfig.enabled) { + const result = await runEmbeddedPiAgent({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: resolveSessionTranscriptPath(params.sessionId), + workspaceDir: params.config?.agents?.defaults?.workspace ?? process.cwd(), + config: params.config, + prompt: params.userMessage, + timeoutMs: params.timeoutMs ?? 300000, + runId: `direct-${Date.now()}`, + images: params.images, + thinkLevel: params.thinking, + }); + + const output = (result.payloads ?? []) + .filter((p): p is { text: string } => !p.isError && typeof p.text === "string") + .map((p) => p.text) + .join("\n") + .trim(); + + const errorPayload = (result.payloads ?? []).find((p) => p.isError); + + return { + payloads: (result.payloads ?? []).map((p) => ({ text: p.text ?? "", isError: p.isError })), + agentResults: [ + { + agentName: "Moltbot", + agentType: "embedded", + status: errorPayload ? "error" : "success", + output, + errorMessage: errorPayload?.text, + durationMs: Date.now() - started, + }, + ], + meta: { + durationMs: Date.now() - started, + orchestratorModel: "disabled", + delegatedAgents: ["embedded"], + }, + }; + } + + const agentResults: AgentResult[] = []; + const onAgentResult = (result: AgentResult) => { + agentResults.push(result); + }; + + const tools = createDelegationTools({ + config: params.config!, + orchestratorConfig, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + onAgentResult, + }); + + const systemPrompt = buildOrchestratorSystemPrompt(orchestratorConfig); + const modelRef = orchestratorConfig.model || "google/gemini-3-flash-preview"; + + const orchestratorRunResult = await runEmbeddedPiAgent({ + sessionId: `orch-${params.sessionId}`, + sessionKey: params.sessionKey ? `orch-${params.sessionKey}` : undefined, + sessionFile: resolveSessionTranscriptPath(`orch-${params.sessionId}`), + workspaceDir: params.config?.agents?.defaults?.workspace ?? process.cwd(), + config: params.config, + prompt: params.userMessage, + timeoutMs: params.timeoutMs ?? 300000, + runId: `orch-run-${Date.now()}`, + provider: modelRef.split("/")[0], + model: modelRef.split("/").slice(1).join("/"), + extraSystemPrompt: systemPrompt, + extraTools: tools, + disableTools: false, + images: params.images, + }); + + const orchestratorText = (orchestratorRunResult.payloads ?? []) + .filter((p): p is { text: string } => !p.isError && typeof p.text === "string") + .map((p) => p.text) + .join("\n") + .trim(); + + if (orchestratorText && agentResults.length === 0) { + agentResults.push({ + agentName: "Orchestrator", + agentType: "orchestrator", + status: "success", + output: orchestratorText, + durationMs: Date.now() - started, + model: modelRef, + }); + } + + return aggregateResults(agentResults, modelRef); +} diff --git a/src/agents/orchestrator/prompt.ts b/src/agents/orchestrator/prompt.ts new file mode 100644 index 000000000..43019f110 --- /dev/null +++ b/src/agents/orchestrator/prompt.ts @@ -0,0 +1,64 @@ +import type { OrchestratorConfig } from "../../config/types.orchestrator.js"; + +export function buildOrchestratorSystemPrompt(config: OrchestratorConfig): string { + const agents = []; + + if (config.agents?.opencode?.enabled !== false) { + agents.push(` +**OpenCode Agent** (tool: delegate_to_opencode) +- Use for: coding tasks, file modifications, debugging, shell commands, git operations +- Capabilities: Full IDE-like agent with bash, file editing, LSP, code search +- When to use: User asks to write code, modify files, run commands, debug issues`); + } + + if (config.agents?.research?.enabled !== false) { + agents.push(` +**Research Agent** (tool: delegate_to_research) +- Use for: web searches, information retrieval, fact-checking +- Capabilities: Web search, URL fetching, documentation lookup +- When to use: User needs current information, documentation, or research`); + } + + if (config.agents?.embedded?.enabled !== false) { + agents.push(` +**Moltbot Agent** (tool: delegate_to_moltbot) +- Use for: general queries, Moltbot-specific features, messaging, scheduling +- Capabilities: All Moltbot tools (message, cron, browser, canvas, etc.) +- When to use: User asks about Moltbot, messaging, or general assistance`); + } + + return `You are Moltbot's Orchestrator. Your role is to understand user requests and route them to the most appropriate specialized agent(s). + +## Available Agents +${agents.join("\n")} + +## Decision Rules + +1. **Analyze the request**: Understand what the user is asking for +2. **Select agent(s)**: Choose the best agent(s) for the task +3. **Parallel execution**: You can call multiple agents simultaneously if the task benefits from it. +4. **Direct response**: For simple greetings or trivial questions, respond directly without delegation. + +## Response Format + +- If delegating: Call the appropriate delegation tool(s). +- If handling directly: Provide a brief, helpful response. +- If multiple agents needed: Call multiple tools - they will run in parallel. + +## Examples + +User: "Write a Python script to parse JSON" +Action: delegate_to_opencode + +User: "What's the weather in Tokyo?" +Action: delegate_to_research + +User: "Send a message to John saying hello" +Action: delegate_to_moltbot + +User: "Research React best practices and create a component" +Action: delegate_to_research AND delegate_to_opencode (parallel) + +User: "Hello!" +Action: Respond directly with a greeting`; +} diff --git a/src/agents/orchestrator/result-aggregator.ts b/src/agents/orchestrator/result-aggregator.ts new file mode 100644 index 000000000..761a301c0 --- /dev/null +++ b/src/agents/orchestrator/result-aggregator.ts @@ -0,0 +1,31 @@ +import type { AgentResult, OrchestratorResponse } from "./types.js"; + +export function aggregateResults( + results: AgentResult[], + orchestratorModel: string, +): OrchestratorResponse { + const payloads: Array<{ text: string; isError?: boolean }> = []; + + for (const result of results) { + if (result.status === "error") { + payloads.push({ + text: `**${result.agentName} Error:**\n${result.errorMessage || result.output}`, + isError: true, + }); + } else { + payloads.push({ + text: results.length > 1 ? `**${result.agentName}:**\n${result.output}` : result.output, + }); + } + } + + return { + payloads, + agentResults: results, + meta: { + durationMs: results.length > 0 ? Math.max(...results.map((r) => r.durationMs)) : 0, + orchestratorModel, + delegatedAgents: results.map((r) => r.agentType), + }, + }; +} diff --git a/src/agents/orchestrator/types.ts b/src/agents/orchestrator/types.ts new file mode 100644 index 000000000..d4c037728 --- /dev/null +++ b/src/agents/orchestrator/types.ts @@ -0,0 +1,46 @@ +import type { ImageContent } from "@mariozechner/pi-ai"; +import type { ThinkLevel } from "../../auto-reply/thinking.js"; +import type { MoltbotConfig } from "../../config/config.js"; +import type { OrchestratorConfig } from "../../config/types.orchestrator.js"; + +export type { + OrchestratorConfig, + OpenCodeAgentConfig, + ResearchAgentConfig, + EmbeddedAgentConfig, +} from "../../config/types.orchestrator.js"; + +export interface AgentResult { + agentName: string; + agentType: "opencode" | "research" | "embedded" | "orchestrator"; + status: "success" | "error" | "partial" | "timeout"; + output: string; + artifacts?: unknown[]; + errorMessage?: string; + durationMs: number; + model?: string; + tokenUsage?: { + input: number; + output: number; + }; +} + +export interface OrchestratorParams { + userMessage: string; + sessionId: string; + sessionKey?: string; + config?: MoltbotConfig; + images?: ImageContent[]; + timeoutMs?: number; + thinking?: ThinkLevel; +} + +export interface OrchestratorResponse { + payloads: Array<{ text: string; isError?: boolean }>; + agentResults: AgentResult[]; + meta: { + durationMs: number; + orchestratorModel: string; + delegatedAgents: string[]; + }; +} diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 870453f38..53ba9a2ed 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -351,6 +351,7 @@ export async function runEmbeddedPiAgent( onToolResult: params.onToolResult, onAgentEvent: params.onAgentEvent, extraSystemPrompt: params.extraSystemPrompt, + extraTools: params.extraTools, streamParams: params.streamParams, ownerNumbers: params.ownerNumbers, enforceFinalTag: params.enforceFinalTag, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 46a53bd8f..8e200baa3 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -433,7 +433,7 @@ export async function runEmbeddedAttempt( }); const { builtInTools, customTools } = splitSdkTools({ - tools, + tools: [...tools, ...(params.extraTools ?? [])], sandboxEnabled: !!sandbox?.enabled, }); diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index b21a5e3fc..8b8d01e2f 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -6,6 +6,7 @@ import type { enqueueCommand } from "../../../process/command-queue.js"; import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js"; import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js"; import type { SkillSnapshot } from "../../skills.js"; +import type { AnyAgentTool } from "../../pi-tools.types.js"; // Simplified tool definition for client-provided tools (OpenResponses hosted tools) export type ClientToolDefinition = { @@ -56,6 +57,7 @@ export type RunEmbeddedPiAgentParams = { images?: ImageContent[]; /** Optional client-provided tools (OpenResponses hosted tools). */ clientTools?: ClientToolDefinition[]; + extraTools?: AnyAgentTool[]; /** Disable built-in tools for this run (LLM-only mode). */ disableTools?: boolean; provider?: string; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 92bb3ff46..63079ddcc 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -9,6 +9,7 @@ import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js"; import type { SkillSnapshot } from "../../skills.js"; +import type { AnyAgentTool } from "../../pi-tools.types.js"; import type { SessionSystemPromptReport } from "../../../config/sessions/types.js"; import type { ClientToolDefinition } from "./params.js"; @@ -48,6 +49,7 @@ export type EmbeddedRunAttemptParams = { images?: ImageContent[]; /** Optional client-provided tools (OpenResponses hosted tools). */ clientTools?: ClientToolDefinition[]; + extraTools?: AnyAgentTool[]; /** Disable built-in tools for this run (LLM-only mode). */ disableTools?: boolean; provider: string; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index f86ecb8a9..464c74419 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -6,6 +6,10 @@ import { getCliSessionId } from "../../agents/cli-session.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; +import { + orchestrateUserMessage, + resolveOrchestratorConfig, +} from "../../agents/orchestrator/index.js"; import { isCompactionFailureError, isContextOverflowError, @@ -97,6 +101,11 @@ export async function runAgentTurnWithFallback(params: { let fallbackModel = params.followupRun.run.model; let didResetAfterCompactionFailure = false; + const orchestratorConfig = resolveOrchestratorConfig(params.followupRun.run.config); + const useOrchestrator = orchestratorConfig.enabled && !params.opts?.noOrchestrator; + + const sessionAgentId = resolveAgentIdFromSessionKey(params.sessionKey); + while (true) { try { const allowPartialStream = !( @@ -134,53 +143,75 @@ export async function runAgentTurnWithFallback(params: { }; const blockReplyPipeline = params.blockReplyPipeline; const onToolResult = params.opts?.onToolResult; - const fallbackResult = await runWithModelFallback({ - cfg: params.followupRun.run.config, - provider: params.followupRun.run.provider, - model: params.followupRun.run.model, - agentDir: params.followupRun.run.agentDir, - fallbacksOverride: resolveAgentModelFallbacksOverride( - params.followupRun.run.config, - resolveAgentIdFromSessionKey(params.followupRun.run.sessionKey), - ), - run: (provider, model) => { - // Notify that model selection is complete (including after fallback). - // This allows responsePrefix template interpolation with the actual model. - params.opts?.onModelSelected?.({ - provider, - model, - thinkLevel: params.followupRun.run.thinkLevel, - }); - if (isCliProvider(provider, params.followupRun.run.config)) { - const startedAt = Date.now(); - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "start", - startedAt, - }, - }); - const cliSessionId = getCliSessionId(params.getActiveSessionEntry(), provider); - return runCliAgent({ + if (useOrchestrator && !didResetAfterCompactionFailure) { + const orchResult = await orchestrateUserMessage({ + userMessage: params.commandBody, + sessionId: params.followupRun.run.sessionId, + sessionKey: params.sessionKey, + config: params.followupRun.run.config, + images: params.opts?.images, + timeoutMs: params.followupRun.run.timeoutMs, + thinking: params.followupRun.run.thinkLevel, + }); + runResult = { + payloads: orchResult.payloads, + meta: { + durationMs: orchResult.meta.durationMs, + agentMeta: { sessionId: params.followupRun.run.sessionId, - sessionKey: params.sessionKey, - sessionFile: params.followupRun.run.sessionFile, - workspaceDir: params.followupRun.run.workspaceDir, - config: params.followupRun.run.config, - prompt: params.commandBody, + provider: "orchestrator", + model: orchResult.meta.orchestratorModel, + }, + }, + }; + } else { + const fallbackResult = await runWithModelFallback({ + cfg: params.followupRun.run.config, + provider: params.followupRun.run.provider, + model: params.followupRun.run.model, + agentDir: params.followupRun.run.agentDir, + fallbacksOverride: resolveAgentModelFallbacksOverride( + params.followupRun.run.config, + sessionAgentId, + ), + run: async (provider, model) => { + // Notify that model selection is complete (including after fallback). + // This allows responsePrefix template interpolation with the actual model. + params.opts?.onModelSelected?.({ provider, model, thinkLevel: params.followupRun.run.thinkLevel, - timeoutMs: params.followupRun.run.timeoutMs, - runId, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - ownerNumbers: params.followupRun.run.ownerNumbers, - cliSessionId, - images: params.opts?.images, - }) - .then((result) => { + }); + + if (isCliProvider(provider, params.followupRun.run.config)) { + const startedAt = Date.now(); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "start", + startedAt, + }, + }); + const cliSessionId = getCliSessionId(params.getActiveSessionEntry(), provider); + return runCliAgent({ + sessionId: params.followupRun.run.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.followupRun.run.sessionFile, + workspaceDir: params.followupRun.run.workspaceDir, + config: params.followupRun.run.config, + prompt: params.commandBody, + provider, + model, + thinkLevel: params.followupRun.run.thinkLevel, + timeoutMs: params.followupRun.run.timeoutMs, + runId, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + ownerNumbers: params.followupRun.run.ownerNumbers, + cliSessionId, + images: params.opts?.images, + }).then((result) => { // CLI backends don't emit streaming assistant events, so we need to // emit one with the final text so server-chat can populate its buffer // and send the response to TUI/WebSocket clients. @@ -198,229 +229,219 @@ export async function runAgentTurnWithFallback(params: { data: { phase: "end", startedAt, - endedAt: Date.now(), + durationMs: Date.now() - startedAt, }, }); return result; - }) - .catch((err) => { - emitAgentEvent({ - runId, - stream: "lifecycle", - data: { - phase: "error", - startedAt, - endedAt: Date.now(), - error: err instanceof Error ? err.message : String(err), - }, - }); - throw err; }); - } - const authProfileId = - provider === params.followupRun.run.provider - ? params.followupRun.run.authProfileId - : undefined; - return runEmbeddedPiAgent({ - sessionId: params.followupRun.run.sessionId, - sessionKey: params.sessionKey, - messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, - agentAccountId: params.sessionCtx.AccountId, - messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To, - messageThreadId: params.sessionCtx.MessageThreadId ?? undefined, - groupId: resolveGroupSessionKey(params.sessionCtx)?.id, - groupChannel: - params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), - groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, - senderId: params.sessionCtx.SenderId?.trim() || undefined, - senderName: params.sessionCtx.SenderName?.trim() || undefined, - senderUsername: params.sessionCtx.SenderUsername?.trim() || undefined, - senderE164: params.sessionCtx.SenderE164?.trim() || undefined, - // Provider threading context for tool auto-injection - ...buildThreadingToolContext({ - sessionCtx: params.sessionCtx, + } + + const authProfileId = + provider === params.followupRun.run.provider + ? params.getActiveSessionEntry()?.authProfileOverride + : undefined; + + return runEmbeddedPiAgent({ + sessionId: params.followupRun.run.sessionId, + sessionKey: params.sessionKey, + messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, + agentAccountId: params.sessionCtx.AccountId, + messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To, + messageThreadId: params.sessionCtx.MessageThreadId ?? undefined, + groupId: resolveGroupSessionKey(params.sessionCtx)?.id, + groupChannel: + params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), + groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, + senderId: params.sessionCtx.SenderId?.trim() || undefined, + senderName: params.sessionCtx.SenderName?.trim() || undefined, + senderUsername: params.sessionCtx.SenderUsername?.trim() || undefined, + senderE164: params.sessionCtx.SenderE164?.trim() || undefined, + // Provider threading context for tool auto-injection + ...buildThreadingToolContext({ + sessionCtx: params.sessionCtx, + config: params.followupRun.run.config, + hasRepliedRef: params.opts?.hasRepliedRef, + }), + sessionFile: params.followupRun.run.sessionFile, + workspaceDir: params.followupRun.run.workspaceDir, + agentDir: params.followupRun.run.agentDir, config: params.followupRun.run.config, - hasRepliedRef: params.opts?.hasRepliedRef, - }), - sessionFile: params.followupRun.run.sessionFile, - workspaceDir: params.followupRun.run.workspaceDir, - agentDir: params.followupRun.run.agentDir, - config: params.followupRun.run.config, - skillsSnapshot: params.followupRun.run.skillsSnapshot, - prompt: params.commandBody, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - ownerNumbers: params.followupRun.run.ownerNumbers, - enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider), - provider, - model, - authProfileId, - authProfileIdSource: authProfileId - ? params.followupRun.run.authProfileIdSource - : undefined, - thinkLevel: params.followupRun.run.thinkLevel, - verboseLevel: params.followupRun.run.verboseLevel, - reasoningLevel: params.followupRun.run.reasoningLevel, - execOverrides: params.followupRun.run.execOverrides, - toolResultFormat: (() => { - const channel = resolveMessageChannel( - params.sessionCtx.Surface, - params.sessionCtx.Provider, - ); - if (!channel) return "markdown"; - return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; - })(), - bashElevated: params.followupRun.run.bashElevated, - timeoutMs: params.followupRun.run.timeoutMs, - runId, - images: params.opts?.images, - abortSignal: params.opts?.abortSignal, - blockReplyBreak: params.resolvedBlockStreamingBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: allowPartialStream - ? async (payload) => { - const textForTyping = await handlePartialForTyping(payload); - if (!params.opts?.onPartialReply || textForTyping === undefined) return; - await params.opts.onPartialReply({ - text: textForTyping, - mediaUrls: payload.mediaUrls, - }); - } - : undefined, - onAssistantMessageStart: async () => { - await params.typingSignals.signalMessageStart(); - }, - onReasoningStream: - params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + skillsSnapshot: params.followupRun.run.skillsSnapshot, + prompt: params.commandBody, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + ownerNumbers: params.followupRun.run.ownerNumbers, + enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider), + provider, + model, + authProfileId, + authProfileIdSource: authProfileId + ? params.followupRun.run.authProfileIdSource + : undefined, + thinkLevel: params.followupRun.run.thinkLevel, + verboseLevel: params.followupRun.run.verboseLevel, + reasoningLevel: params.followupRun.run.reasoningLevel, + execOverrides: params.followupRun.run.execOverrides, + toolResultFormat: (() => { + const channel = resolveMessageChannel( + params.sessionCtx.Surface, + params.sessionCtx.Provider, + ); + if (!channel) return "markdown"; + return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; + })(), + bashElevated: params.followupRun.run.bashElevated, + timeoutMs: params.followupRun.run.timeoutMs, + runId, + images: params.opts?.images, + abortSignal: params.opts?.abortSignal, + blockReplyBreak: params.resolvedBlockStreamingBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: allowPartialStream ? async (payload) => { - await params.typingSignals.signalReasoningDelta(); - await params.opts?.onReasoningStream?.({ - text: payload.text, + const textForTyping = await handlePartialForTyping(payload); + if (!params.opts?.onPartialReply || textForTyping === undefined) return; + await params.opts.onPartialReply({ + text: textForTyping, mediaUrls: payload.mediaUrls, }); } : undefined, - onAgentEvent: async (evt) => { - // Trigger typing when tools start executing. - // Must await to ensure typing indicator starts before tool summaries are emitted. - if (evt.stream === "tool") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - } - } - // Track auto-compaction completion - if (evt.stream === "compaction") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - const willRetry = Boolean(evt.data.willRetry); - if (phase === "end" && !willRetry) { - autoCompactionCompleted = true; - } - } - }, - // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, - // even when regular block streaming is disabled. The handler sends directly - // via opts.onBlockReply when the pipeline isn't available. - onBlockReply: params.opts?.onBlockReply - ? async (payload) => { - const { text, skip } = normalizeStreamingText(payload); - const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (skip && !hasPayloadMedia) return; - const currentMessageId = - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; - const taggedPayload = applyReplyTagsToPayload( - { - text, - mediaUrls: payload.mediaUrls, - mediaUrl: payload.mediaUrls?.[0], - replyToId: payload.replyToId, - replyToTag: payload.replyToTag, - replyToCurrent: payload.replyToCurrent, - }, - currentMessageId, - ); - // Let through payloads with audioAsVoice flag even if empty (need to track it) - if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return; - const parsed = parseReplyDirectives(taggedPayload.text ?? "", { - currentMessageId, - silentToken: SILENT_REPLY_TOKEN, - }); - const cleaned = parsed.text || undefined; - const hasRenderableMedia = - Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; - // Skip empty payloads unless they have audioAsVoice flag (need to track it) - if ( - !cleaned && - !hasRenderableMedia && - !payload.audioAsVoice && - !parsed.audioAsVoice - ) - return; - if (parsed.isSilent && !hasRenderableMedia) return; - - const blockPayload: ReplyPayload = params.applyReplyToMode({ - ...taggedPayload, - text: cleaned, - audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), - replyToId: taggedPayload.replyToId ?? parsed.replyToId, - replyToTag: taggedPayload.replyToTag || parsed.replyToTag, - replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, - }); - - void params.typingSignals - .signalTextDelta(cleaned ?? taggedPayload.text) - .catch((err) => { - logVerbose(`block reply typing signal failed: ${String(err)}`); - }); - - // Use pipeline if available (block streaming enabled), otherwise send directly - if (params.blockStreamingEnabled && params.blockReplyPipeline) { - params.blockReplyPipeline.enqueue(blockPayload); - } else if (params.blockStreamingEnabled) { - // Send directly when flushing before tool execution (no pipeline but streaming enabled). - // Track sent key to avoid duplicate in final payloads. - directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); - await params.opts?.onBlockReply?.(blockPayload); + onAssistantMessageStart: async () => { + await params.typingSignals.signalMessageStart(); + }, + onReasoningStream: + params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + ? async (payload) => { + await params.typingSignals.signalReasoningDelta(); + await params.opts?.onReasoningStream?.({ + text: payload.text, + mediaUrls: payload.mediaUrls, + }); + } + : undefined, + onAgentEvent: async (evt) => { + // Trigger typing when tools start executing. + // Must await to ensure typing indicator starts before tool summaries are emitted. + if (evt.stream === "tool") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + if (phase === "start" || phase === "update") { + await params.typingSignals.signalToolStart(); } - // When streaming is disabled entirely, blocks are accumulated in final text instead. } - : undefined, - onBlockReplyFlush: - params.blockStreamingEnabled && blockReplyPipeline - ? async () => { - await blockReplyPipeline.flush({ force: true }); + // Track auto-compaction completion + if (evt.stream === "compaction") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const willRetry = Boolean(evt.data.willRetry); + if (phase === "end" && !willRetry) { + autoCompactionCompleted = true; } - : undefined, - shouldEmitToolResult: params.shouldEmitToolResult, - shouldEmitToolOutput: params.shouldEmitToolOutput, - onToolResult: onToolResult - ? (payload) => { - // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. - // If a tool callback starts typing after the run finalized, we can end up with - // a typing loop that never sees a matching markRunComplete(). Track and drain. - const task = (async () => { + } + }, + // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, + // even when regular block streaming is disabled. The handler sends directly + // via opts.onBlockReply when the pipeline isn't available. + onBlockReply: params.opts?.onBlockReply + ? async (payload) => { const { text, skip } = normalizeStreamingText(payload); - if (skip) return; - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - text, - mediaUrls: payload.mediaUrls, + const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (skip && !hasPayloadMedia) return; + const currentMessageId = + params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; + const taggedPayload = applyReplyTagsToPayload( + { + text, + mediaUrls: payload.mediaUrls, + mediaUrl: payload.mediaUrls?.[0], + replyToId: payload.replyToId, + replyToTag: payload.replyToTag, + replyToCurrent: payload.replyToCurrent, + }, + currentMessageId, + ); + // Let through payloads with audioAsVoice flag even if empty (need to track it) + if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return; + const parsed = parseReplyDirectives(taggedPayload.text ?? "", { + currentMessageId, + silentToken: SILENT_REPLY_TOKEN, }); - })() - .catch((err) => { - logVerbose(`tool result delivery failed: ${String(err)}`); - }) - .finally(() => { - params.pendingToolTasks.delete(task); + const cleaned = parsed.text || undefined; + const hasRenderableMedia = + Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; + // Skip empty payloads unless they have audioAsVoice flag (need to track it) + if ( + !cleaned && + !hasRenderableMedia && + !payload.audioAsVoice && + !parsed.audioAsVoice + ) + return; + if (parsed.isSilent && !hasRenderableMedia) return; + + const blockPayload: ReplyPayload = params.applyReplyToMode({ + ...taggedPayload, + text: cleaned, + audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), + replyToId: taggedPayload.replyToId ?? parsed.replyToId, + replyToTag: taggedPayload.replyToTag || parsed.replyToTag, + replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, }); - params.pendingToolTasks.add(task); - } - : undefined, - }); - }, - }); - runResult = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; + + void params.typingSignals + .signalTextDelta(cleaned ?? taggedPayload.text) + .catch((err) => { + logVerbose(`block reply typing signal failed: ${String(err)}`); + }); + + // Use pipeline if available (block streaming enabled), otherwise send directly + if (params.blockStreamingEnabled && params.blockReplyPipeline) { + params.blockReplyPipeline.enqueue(blockPayload); + } else if (params.blockStreamingEnabled) { + // Send directly when flushing before tool execution (no pipeline but streaming enabled). + // Track sent key to avoid duplicate in final payloads. + directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + await params.opts?.onBlockReply?.(blockPayload); + } + // When streaming is disabled entirely, blocks are accumulated in final text instead. + } + : undefined, + onBlockReplyFlush: + params.blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, + shouldEmitToolResult: params.shouldEmitToolResult, + shouldEmitToolOutput: params.shouldEmitToolOutput, + onToolResult: onToolResult + ? (payload) => { + // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. + // If a tool callback starts typing after the run finalized, we can end up with + // a typing loop that never sees a matching markRunComplete(). Track and drain. + const task = (async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) return; + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + text, + mediaUrls: payload.mediaUrls, + }); + })() + .catch((err) => { + logVerbose(`tool result delivery failed: ${String(err)}`); + }) + .finally(() => { + params.pendingToolTasks.delete(task); + }); + params.pendingToolTasks.add(task); + } + : undefined, + }); + }, + }); + runResult = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + } // Some embedded runs surface context overflow as an error payload instead of throwing. // Treat those as a session-level failure and auto-recover by starting a fresh session. diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 1aa0fe067..6e106e4a5 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -39,6 +39,7 @@ export type GetReplyOptions = { skillFilter?: string[]; /** Mutable ref to track if a reply was sent (for Slack "first" threading mode). */ hasRepliedRef?: { value: boolean }; + noOrchestrator?: boolean; }; export type ReplyPayload = { diff --git a/src/cli/program/register.agent.ts b/src/cli/program/register.agent.ts index 6e5bbc210..daa583ef1 100644 --- a/src/cli/program/register.agent.ts +++ b/src/cli/program/register.agent.ts @@ -45,6 +45,8 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti "--timeout ", "Override agent command timeout (seconds, default 600 or config value)", ) + .option("--orchestrator", "Force enable the orchestrator for this run", false) + .option("--no-orchestrator", "Explicitly disable the orchestrator for this run", false) .addHelpText( "after", () => diff --git a/src/commands/agent-via-gateway.ts b/src/commands/agent-via-gateway.ts index b8401e2a1..20c8d947e 100644 --- a/src/commands/agent-via-gateway.ts +++ b/src/commands/agent-via-gateway.ts @@ -49,6 +49,8 @@ export type AgentCliOpts = { lane?: string; runId?: string; extraSystemPrompt?: string; + orchestrator?: boolean; + noOrchestrator?: boolean; local?: boolean; }; @@ -134,6 +136,8 @@ export async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: Runtim timeout: timeoutSeconds, lane: opts.lane, extraSystemPrompt: opts.extraSystemPrompt, + orchestrator: opts.orchestrator, + noOrchestrator: opts.noOrchestrator, idempotencyKey, }, expectFinal: true, diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 0e745557c..98a6f63a9 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -19,6 +19,7 @@ import { resolveThinkingDefault, } from "../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { orchestrateUserMessage, resolveOrchestratorConfig } from "../agents/orchestrator/index.js"; import { buildWorkspaceSkillSnapshot } from "../agents/skills.js"; import { getSkillsSnapshotVersion } from "../agents/skills/refresh.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; @@ -371,6 +372,11 @@ export async function agentCommand( let result: Awaited>; let fallbackProvider = provider; let fallbackModel = model; + + const orchestratorConfig = resolveOrchestratorConfig(cfg); + const useOrchestrator = + (orchestratorConfig.enabled || opts.orchestrator) && !opts.noOrchestrator; + try { const runContext = resolveAgentRunContext(opts); const messageChannel = resolveMessageChannel( @@ -378,88 +384,112 @@ export async function agentCommand( opts.replyChannel ?? opts.channel, ); const spawnedBy = opts.spawnedBy ?? sessionEntry?.spawnedBy; - const fallbackResult = await runWithModelFallback({ - cfg, - provider, - model, - agentDir, - fallbacksOverride: resolveAgentModelFallbacksOverride(cfg, sessionAgentId), - run: (providerOverride, modelOverride) => { - if (isCliProvider(providerOverride, cfg)) { - const cliSessionId = getCliSessionId(sessionEntry, providerOverride); - return runCliAgent({ + + if (useOrchestrator) { + const orchResult = await orchestrateUserMessage({ + userMessage: body, + sessionId, + sessionKey, + config: cfg, + images: opts.images, + timeoutMs, + thinking: resolvedThinkLevel, + }); + result = { + payloads: orchResult.payloads, + meta: { + durationMs: orchResult.meta.durationMs, + agentMeta: { + sessionId, + provider: "orchestrator", + model: orchResult.meta.orchestratorModel, + }, + }, + }; + } else { + const fallbackResult = await runWithModelFallback({ + cfg, + provider, + model, + agentDir, + fallbacksOverride: resolveAgentModelFallbacksOverride(cfg, sessionAgentId), + run: (providerOverride, modelOverride) => { + if (isCliProvider(providerOverride, cfg)) { + const cliSessionId = getCliSessionId(sessionEntry, providerOverride); + return runCliAgent({ + sessionId, + sessionKey, + sessionFile, + workspaceDir, + config: cfg, + prompt: body, + provider: providerOverride, + model: modelOverride, + thinkLevel: resolvedThinkLevel, + timeoutMs, + runId, + extraSystemPrompt: opts.extraSystemPrompt, + cliSessionId, + images: opts.images, + streamParams: opts.streamParams, + }); + } + const authProfileId = + providerOverride === provider ? sessionEntry?.authProfileOverride : undefined; + return runEmbeddedPiAgent({ sessionId, sessionKey, + messageChannel, + agentAccountId: runContext.accountId, + messageTo: opts.replyTo ?? opts.to, + messageThreadId: opts.threadId, + groupId: runContext.groupId, + groupChannel: runContext.groupChannel, + groupSpace: runContext.groupSpace, + spawnedBy, + currentChannelId: runContext.currentChannelId, + currentThreadTs: runContext.currentThreadTs, + replyToMode: runContext.replyToMode, + hasRepliedRef: runContext.hasRepliedRef, sessionFile, workspaceDir, config: cfg, + skillsSnapshot, prompt: body, + images: opts.images, + clientTools: opts.clientTools, provider: providerOverride, model: modelOverride, + authProfileId, + authProfileIdSource: authProfileId + ? sessionEntry?.authProfileOverrideSource + : undefined, thinkLevel: resolvedThinkLevel, + verboseLevel: resolvedVerboseLevel, timeoutMs, runId, + lane: opts.lane, + abortSignal: opts.abortSignal, extraSystemPrompt: opts.extraSystemPrompt, - cliSessionId, - images: opts.images, streamParams: opts.streamParams, + agentDir, + onAgentEvent: (evt) => { + // Track lifecycle end for fallback emission below. + if ( + evt.stream === "lifecycle" && + typeof evt.data?.phase === "string" && + (evt.data.phase === "end" || evt.data.phase === "error") + ) { + lifecycleEnded = true; + } + }, }); - } - const authProfileId = - providerOverride === provider ? sessionEntry?.authProfileOverride : undefined; - return runEmbeddedPiAgent({ - sessionId, - sessionKey, - messageChannel, - agentAccountId: runContext.accountId, - messageTo: opts.replyTo ?? opts.to, - messageThreadId: opts.threadId, - groupId: runContext.groupId, - groupChannel: runContext.groupChannel, - groupSpace: runContext.groupSpace, - spawnedBy, - currentChannelId: runContext.currentChannelId, - currentThreadTs: runContext.currentThreadTs, - replyToMode: runContext.replyToMode, - hasRepliedRef: runContext.hasRepliedRef, - sessionFile, - workspaceDir, - config: cfg, - skillsSnapshot, - prompt: body, - images: opts.images, - clientTools: opts.clientTools, - provider: providerOverride, - model: modelOverride, - authProfileId, - authProfileIdSource: authProfileId - ? sessionEntry?.authProfileOverrideSource - : undefined, - thinkLevel: resolvedThinkLevel, - verboseLevel: resolvedVerboseLevel, - timeoutMs, - runId, - lane: opts.lane, - abortSignal: opts.abortSignal, - extraSystemPrompt: opts.extraSystemPrompt, - streamParams: opts.streamParams, - agentDir, - onAgentEvent: (evt) => { - // Track lifecycle end for fallback emission below. - if ( - evt.stream === "lifecycle" && - typeof evt.data?.phase === "string" && - (evt.data.phase === "end" || evt.data.phase === "error") - ) { - lifecycleEnded = true; - } - }, - }); - }, - }); - result = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; + }, + }); + result = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + } if (!lifecycleEnded) { emitAgentEvent({ runId, diff --git a/src/commands/agent/types.ts b/src/commands/agent/types.ts index e59c88725..3cf13829a 100644 --- a/src/commands/agent/types.ts +++ b/src/commands/agent/types.ts @@ -72,6 +72,8 @@ export type AgentCommandOpts = { lane?: string; runId?: string; extraSystemPrompt?: string; + orchestrator?: boolean; + noOrchestrator?: boolean; /** Per-call stream param overrides (best-effort). */ streamParams?: AgentStreamParams; }; diff --git a/src/commands/configure.orchestrator.ts b/src/commands/configure.orchestrator.ts new file mode 100644 index 000000000..cec435775 --- /dev/null +++ b/src/commands/configure.orchestrator.ts @@ -0,0 +1,52 @@ +import { confirm, intro, outro, text } from "./configure.shared.js"; +import { loadConfig, writeConfigFile } from "../config/io.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { ORCHESTRATOR_DEFAULT_MODEL } from "../agents/orchestrator/config.js"; + +export async function runOrchestratorWizard(_runtime: RuntimeEnv) { + intro("Orchestrator Configuration"); + + const cfg = loadConfig(); + const orchestrator = cfg.orchestrator ?? {}; + + const enabled = await confirm({ + message: "Enable smart orchestrator? (Intercepts messages and routes to best agent)", + initialValue: orchestrator.enabled ?? false, + }); + + if (typeof enabled === "symbol") return; + + const model = await text({ + message: "Orchestrator Model", + placeholder: ORCHESTRATOR_DEFAULT_MODEL, + initialValue: orchestrator.model ?? ORCHESTRATOR_DEFAULT_MODEL, + }); + + if (typeof model === "symbol") return; + + const opencodeEnabled = await confirm({ + message: "Enable OpenCode Agent delegation?", + initialValue: orchestrator.agents?.opencode?.enabled ?? true, + }); + + if (typeof opencodeEnabled === "symbol") return; + + const nextCfg = { + ...cfg, + orchestrator: { + ...orchestrator, + enabled, + model, + agents: { + ...orchestrator.agents, + opencode: { + ...orchestrator.agents?.opencode, + enabled: opencodeEnabled, + }, + }, + }, + }; + + await writeConfigFile(nextCfg); + outro("Orchestrator configuration updated."); +} diff --git a/src/commands/configure.shared.ts b/src/commands/configure.shared.ts index bc89529d8..cb264e7e5 100644 --- a/src/commands/configure.shared.ts +++ b/src/commands/configure.shared.ts @@ -16,6 +16,7 @@ export const CONFIGURE_WIZARD_SECTIONS = [ "daemon", "channels", "skills", + "orchestrator", "health", ] as const; @@ -48,6 +49,7 @@ export const CONFIGURE_SECTION_OPTIONS: Array<{ hint: "Link WhatsApp/Telegram/etc and defaults", }, { value: "skills", label: "Skills", hint: "Install/enable workspace skills" }, + { value: "orchestrator", label: "Orchestrator", hint: "Configure smart routing + OpenCode" }, { value: "health", label: "Health check", diff --git a/src/commands/configure.wizard.ts b/src/commands/configure.wizard.ts index 4be87c596..09de670cd 100644 --- a/src/commands/configure.wizard.ts +++ b/src/commands/configure.wizard.ts @@ -13,6 +13,7 @@ import { removeChannelConfigWizard } from "./configure.channels.js"; import { maybeInstallDaemon } from "./configure.daemon.js"; import { promptGatewayConfig } from "./configure.gateway.js"; import { promptAuthConfig } from "./configure.gateway-auth.js"; +import { runOrchestratorWizard } from "./configure.orchestrator.js"; import type { ChannelsWizardMode, ConfigureWizardParams, @@ -349,7 +350,13 @@ export async function runConfigureWizard( nextConfig = await setupSkills(nextConfig, wsDir, runtime, prompter); } - await persistConfig(); + if (selected.includes("orchestrator")) { + await runOrchestratorWizard(runtime); + const snapshot = await readConfigFileSnapshot(); + if (snapshot.exists) { + nextConfig = snapshot.config; + } + } if (selected.includes("daemon")) { if (!selected.includes("gateway")) { diff --git a/src/config/types.moltbot.ts b/src/config/types.moltbot.ts index c2b2f05e4..8076ed16b 100644 --- a/src/config/types.moltbot.ts +++ b/src/config/types.moltbot.ts @@ -19,6 +19,7 @@ import type { MessagesConfig, } from "./types.messages.js"; import type { ModelsConfig } from "./types.models.js"; +import type { OrchestratorConfig } from "./types.orchestrator.js"; import type { NodeHostConfig } from "./types.node-host.js"; import type { PluginsConfig } from "./types.plugins.js"; import type { SkillsConfig } from "./types.skills.js"; @@ -77,6 +78,7 @@ export type MoltbotConfig = { skills?: SkillsConfig; plugins?: PluginsConfig; models?: ModelsConfig; + orchestrator?: OrchestratorConfig; nodeHost?: NodeHostConfig; agents?: AgentsConfig; tools?: ToolsConfig; diff --git a/src/config/types.orchestrator.ts b/src/config/types.orchestrator.ts new file mode 100644 index 000000000..e5c056795 --- /dev/null +++ b/src/config/types.orchestrator.ts @@ -0,0 +1,31 @@ +export interface OpenCodeAgentConfig { + enabled?: boolean; + model?: string; + mode?: "cli" | "serve"; + binary?: string; + timeoutMs?: number; + servePort?: number; +} + +export interface ResearchAgentConfig { + enabled?: boolean; + model?: string; +} + +export interface EmbeddedAgentConfig { + enabled?: boolean; + model?: string | null; +} + +export interface OrchestratorConfig { + enabled?: boolean; + model?: string; + fallbacks?: string[]; + agents?: { + opencode?: OpenCodeAgentConfig; + research?: ResearchAgentConfig; + embedded?: EmbeddedAgentConfig; + }; + parallelExecution?: boolean; + maxParallelAgents?: number; +} diff --git a/src/config/types.ts b/src/config/types.ts index a34bae20c..ce403cc29 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -16,6 +16,7 @@ export * from "./types.hooks.js"; export * from "./types.imessage.js"; export * from "./types.messages.js"; export * from "./types.models.js"; +export * from "./types.orchestrator.js"; export * from "./types.node-host.js"; export * from "./types.msteams.js"; export * from "./types.plugins.js"; diff --git a/src/config/zod-schema.orchestrator.ts b/src/config/zod-schema.orchestrator.ts new file mode 100644 index 000000000..b2e42f8f0 --- /dev/null +++ b/src/config/zod-schema.orchestrator.ts @@ -0,0 +1,48 @@ +import { z } from "zod"; + +export const OpenCodeAgentSchema = z + .object({ + enabled: z.boolean().optional(), + model: z.string().optional(), + mode: z.union([z.literal("cli"), z.literal("serve")]).optional(), + binary: z.string().optional(), + timeoutMs: z.number().int().nonnegative().optional(), + servePort: z.number().int().min(1).max(65535).optional(), + }) + .strict() + .optional(); + +export const ResearchAgentSchema = z + .object({ + enabled: z.boolean().optional(), + model: z.string().optional(), + }) + .strict() + .optional(); + +export const EmbeddedAgentSchema = z + .object({ + enabled: z.boolean().optional(), + model: z.string().nullable().optional(), + }) + .strict() + .optional(); + +export const OrchestratorSchema = z + .object({ + enabled: z.boolean().optional(), + model: z.string().optional(), + fallbacks: z.array(z.string()).optional(), + agents: z + .object({ + opencode: OpenCodeAgentSchema, + research: ResearchAgentSchema, + embedded: EmbeddedAgentSchema, + }) + .strict() + .optional(), + parallelExecution: z.boolean().optional(), + maxParallelAgents: z.number().int().nonnegative().optional(), + }) + .strict() + .optional(); diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index ce4115517..69acdac02 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -5,6 +5,7 @@ import { AgentsSchema, AudioSchema, BindingsSchema, BroadcastSchema } from "./zo import { HexColorSchema, ModelsConfigSchema } from "./zod-schema.core.js"; import { HookMappingSchema, HooksGmailSchema, InternalHooksSchema } from "./zod-schema.hooks.js"; import { ChannelsSchema } from "./zod-schema.providers.js"; +import { OrchestratorSchema } from "./zod-schema.orchestrator.js"; import { CommandsSchema, MessagesSchema, SessionSchema } from "./zod-schema.session.js"; const BrowserSnapshotDefaultsSchema = z @@ -207,6 +208,7 @@ export const MoltbotSchema = z .strict() .optional(), models: ModelsConfigSchema, + orchestrator: OrchestratorSchema, nodeHost: NodeHostSchema, agents: AgentsSchema, tools: ToolsSchema, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index b299fe9a2..eeaba200a 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -19,6 +19,10 @@ import { resolveHooksGmailModel, resolveThinkingDefault, } from "../../agents/model-selection.js"; +import { + orchestrateUserMessage, + resolveOrchestratorConfig, +} from "../../agents/orchestrator/index.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; @@ -317,6 +321,10 @@ export async function runCronIsolatedAgentTurn(params: { let runResult: Awaited>; let fallbackProvider = provider; let fallbackModel = model; + + const orchestratorConfig = resolveOrchestratorConfig(cfgWithAgentDefaults); + const useOrchestrator = orchestratorConfig.enabled; + try { const sessionFile = resolveSessionTranscriptPath(cronSession.sessionEntry.sessionId, agentId); const resolvedVerboseLevel = @@ -328,53 +336,76 @@ export async function runCronIsolatedAgentTurn(params: { verboseLevel: resolvedVerboseLevel, }); const messageChannel = resolvedDelivery.channel; - const fallbackResult = await runWithModelFallback({ - cfg: cfgWithAgentDefaults, - provider, - model, - agentDir, - fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId), - run: (providerOverride, modelOverride) => { - if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { - const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride); - return runCliAgent({ + + if (useOrchestrator) { + const orchResult = await orchestrateUserMessage({ + userMessage: commandBody, + sessionId: cronSession.sessionEntry.sessionId, + sessionKey: agentSessionKey, + config: cfgWithAgentDefaults, + timeoutMs, + thinking: thinkLevel, + }); + runResult = { + payloads: orchResult.payloads, + meta: { + durationMs: orchResult.meta.durationMs, + agentMeta: { + sessionId: cronSession.sessionEntry.sessionId, + provider: "orchestrator", + model: orchResult.meta.orchestratorModel, + }, + }, + }; + } else { + const fallbackResult = await runWithModelFallback({ + cfg: cfgWithAgentDefaults, + provider, + model, + agentDir, + fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId), + run: (providerOverride, modelOverride) => { + if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { + const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride); + return runCliAgent({ + sessionId: cronSession.sessionEntry.sessionId, + sessionKey: agentSessionKey, + sessionFile, + workspaceDir, + config: cfgWithAgentDefaults, + prompt: commandBody, + provider: providerOverride, + model: modelOverride, + thinkLevel: thinkLevel, + timeoutMs: timeoutMs, + runId: cronSession.sessionEntry.sessionId, + cliSessionId, + }); + } + return runEmbeddedPiAgent({ sessionId: cronSession.sessionEntry.sessionId, sessionKey: agentSessionKey, + messageChannel, + agentAccountId: resolvedDelivery.accountId, sessionFile, workspaceDir, config: cfgWithAgentDefaults, + skillsSnapshot, prompt: commandBody, + lane: params.lane ?? "cron", provider: providerOverride, model: modelOverride, - thinkLevel, + thinkLevel: thinkLevel, + verboseLevel: resolvedVerboseLevel, timeoutMs, runId: cronSession.sessionEntry.sessionId, - cliSessionId, }); - } - return runEmbeddedPiAgent({ - sessionId: cronSession.sessionEntry.sessionId, - sessionKey: agentSessionKey, - messageChannel, - agentAccountId: resolvedDelivery.accountId, - sessionFile, - workspaceDir, - config: cfgWithAgentDefaults, - skillsSnapshot, - prompt: commandBody, - lane: params.lane ?? "cron", - provider: providerOverride, - model: modelOverride, - thinkLevel, - verboseLevel: resolvedVerboseLevel, - timeoutMs, - runId: cronSession.sessionEntry.sessionId, - }); - }, - }); - runResult = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; + }, + }); + runResult = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + } } catch (err) { return { status: "error", error: String(err) }; } diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 3f1a5b5a8..b14d42059 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -68,6 +68,8 @@ export const AgentParamsSchema = Type.Object( idempotencyKey: NonEmptyString, label: Type.Optional(SessionLabelString), spawnedBy: Type.Optional(Type.String()), + orchestrator: Type.Optional(Type.Boolean()), + noOrchestrator: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index d159d1f78..6dd7d5f25 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -85,6 +85,8 @@ export const agentHandlers: GatewayRequestHandlers = { timeout?: number; label?: string; spawnedBy?: string; + orchestrator?: boolean; + noOrchestrator?: boolean; }; const cfg = loadConfig(); const idem = request.idempotencyKey; @@ -377,6 +379,8 @@ export const agentHandlers: GatewayRequestHandlers = { runId, lane: request.lane, extraSystemPrompt: request.extraSystemPrompt, + orchestrator: request.orchestrator, + noOrchestrator: request.noOrchestrator, }, defaultRuntime, context.deps,