Merge pull request #1 from M1Vj/feature/orchestrator

feat: implement orchestrator agent and delegation tools
This commit is contained in:
Vj 2026-01-30 05:29:45 +08:00 committed by GitHub
commit b9f3c36ce2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1588 additions and 355 deletions

371
docs/orchestrator-spec.md Normal file
View File

@ -0,0 +1,371 @@
# Moltbot Orchestrator Feature Specification
## Overview
This document specifies the implementation of an **Orchestrator Layer** for Moltbot. The orchestrator acts as an intelligent router that intercepts user messages and decides whether to handle them directly or delegate to specialized agents.
## Goals
1. **Smart Routing**: Use a fast LLM (configurable) to analyze incoming messages and route to appropriate agents
2. **OpenCode Integration**: Enable Moltbot to delegate coding tasks to OpenCode CLI
3. **Parallel Execution**: Support running multiple agents concurrently
4. **Configurable Models**: All agent models should be configurable via `moltbot.json`
5. **Extensible**: Easy to add new specialized agents
## Architecture
```
User Message
┌─────────────────────────────────────────────────────────────┐
│ ORCHESTRATOR (configurable model) │
│ - Default: google-gemini-cli/gemini-3-flash-preview │
│ - System prompt: knows available agents │
│ - Delegation tools: spawn specialized agents │
│ - Decision: handle directly OR delegate │
└────────┬─────────────┬──────────────┬──────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ OpenCode │ │ Embedded Pi │ │ Research │
│ Agent │ │ Agent │ │ Agent │
│ (CLI/HTTP) │ │ (existing) │ │ (web tools) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└─────────────┼──────────────┘
Result Aggregation
```
## Configuration Schema
Add to `moltbot.json`:
```json5
{
"orchestrator": {
"enabled": true,
"model": "google-gemini-cli/gemini-3-flash-preview",
"fallbacks": ["openai-codex/gpt-5.2-codex"],
"agents": {
"opencode": {
"enabled": true,
"model": "anthropic/claude-sonnet-4",
"mode": "cli", // "cli" | "serve"
"binary": "opencode",
"timeoutMs": 300000
},
"research": {
"enabled": true,
"model": "google-gemini-cli/gemini-3-flash-preview"
},
"embedded": {
"enabled": true,
"model": null // null = use agents.defaults.model
}
},
"parallelExecution": true,
"maxParallelAgents": 3
}
}
```
## File Structure
```
src/agents/orchestrator/
├── index.ts # Main exports
├── types.ts # Type definitions
├── config.ts # Config schema and defaults
├── orchestrator.ts # Main orchestrateUserMessage()
├── prompt.ts # System prompt builder
├── result-aggregator.ts # Combine agent results
└── delegation-tools/
├── index.ts # Tool exports
├── opencode-agent.ts # OpenCode CLI/HTTP integration
├── research-agent.ts # Web search delegation
└── embedded-agent.ts # Existing Pi agent delegation
```
## Implementation Checklist
### Phase 1: Core Infrastructure
- [ ] Create feature branch `feature/orchestrator`
- [ ] Create `src/agents/orchestrator/` directory structure
- [ ] Implement `types.ts` - AgentResult, OrchestratorParams, OrchestratorResponse
- [ ] Implement `config.ts` - Config schema with Zod/TypeBox validation
- [ ] Add orchestrator config types to `src/config/types.ts`
### Phase 2: Orchestrator Core
- [ ] Implement `prompt.ts` - Build orchestrator system prompt
- [ ] Implement `orchestrator.ts` - Main orchestration logic
- [ ] Implement `result-aggregator.ts` - Combine multiple agent outputs
### Phase 3: Delegation Tools
- [ ] Implement `delegation-tools/opencode-agent.ts` - OpenCode CLI integration
- [ ] Implement `delegation-tools/research-agent.ts` - Web search delegation
- [ ] Implement `delegation-tools/embedded-agent.ts` - Existing agent delegation
- [ ] Implement `delegation-tools/index.ts` - Export all tools
### Phase 4: Integration
- [ ] Add orchestrator to tool registry in `moltbot-tools.ts`
- [ ] Integrate into message handling pipeline
- [ ] Add CLI flag for enabling/disabling orchestrator
- [ ] Update config migration if needed
### Phase 5: Testing & Documentation
- [ ] Add unit tests for orchestrator logic
- [ ] Add integration tests for delegation tools
- [ ] Add E2E test for full orchestration flow
- [ ] Update documentation
### Phase 6: Polish
- [ ] Run linter and fix issues
- [ ] Run full test suite
- [ ] Create PR with changelog entry
## Type Definitions
```typescript
// types.ts
export interface AgentResult {
agentName: string;
agentType: "opencode" | "research" | "embedded" | "orchestrator";
status: "success" | "error" | "partial" | "timeout";
output: string;
artifacts?: unknown[];
errorMessage?: string;
durationMs: number;
model?: string;
tokenUsage?: {
input: number;
output: number;
};
}
export interface OrchestratorParams {
userMessage: string;
sessionId: string;
sessionKey?: string;
config?: MoltbotConfig;
images?: ImageContent[];
timeoutMs?: number;
thinking?: ThinkLevel;
}
export interface OrchestratorResponse {
payloads: Array<{ text: string; isError?: boolean }>;
agentResults: AgentResult[];
meta: {
durationMs: number;
orchestratorModel: string;
delegatedAgents: string[];
};
}
export interface OrchestratorConfig {
enabled: boolean;
model: string;
fallbacks?: string[];
agents: {
opencode?: OpenCodeAgentConfig;
research?: ResearchAgentConfig;
embedded?: EmbeddedAgentConfig;
};
parallelExecution?: boolean;
maxParallelAgents?: number;
}
export interface OpenCodeAgentConfig {
enabled: boolean;
model?: string;
mode: "cli" | "serve";
binary?: string;
timeoutMs?: number;
servePort?: number;
}
export interface ResearchAgentConfig {
enabled: boolean;
model?: string;
}
export interface EmbeddedAgentConfig {
enabled: boolean;
model?: string | null;
}
```
## OpenCode Integration Details
### CLI Mode (`opencode run`)
```typescript
// Simple one-shot execution
async function runOpenCodeCli(message: string, options: OpenCodeAgentConfig): Promise<AgentResult> {
const args = ["run", message];
if (options.model) {
args.push("--model", options.model);
}
const result = await spawnCommand(options.binary ?? "opencode", args, {
timeoutMs: options.timeoutMs ?? 300000,
cwd: workspaceDir,
});
return {
agentName: "OpenCode",
agentType: "opencode",
status: result.exitCode === 0 ? "success" : "error",
output: result.stdout,
errorMessage: result.stderr || undefined,
durationMs: result.durationMs,
model: options.model,
};
}
```
### Serve Mode (Future)
```typescript
// Persistent HTTP server for sessions
interface OpenCodeServer {
port: number;
sessionId: string;
prompt(message: string): Promise<AgentResult>;
close(): Promise<void>;
}
async function getOrCreateOpenCodeServer(sessionId: string): Promise<OpenCodeServer> {
// Check if server exists for this session
// If not, spawn `opencode serve --port <port>`
// Return client connected to the server
}
```
## Orchestrator System Prompt
```typescript
// prompt.ts
export function buildOrchestratorSystemPrompt(config: OrchestratorConfig): string {
const agents = [];
if (config.agents.opencode?.enabled) {
agents.push(`
**OpenCode Agent** (tool: delegate_to_opencode)
- Use for: coding tasks, file modifications, debugging, shell commands, git operations
- Capabilities: Full IDE-like agent with bash, file editing, LSP, code search
- When to use: User asks to write code, modify files, run commands, debug issues`);
}
if (config.agents.research?.enabled) {
agents.push(`
**Research Agent** (tool: delegate_to_research)
- Use for: web searches, information retrieval, fact-checking
- Capabilities: Web search, URL fetching, documentation lookup
- When to use: User needs current information, documentation, or research`);
}
if (config.agents.embedded?.enabled) {
agents.push(`
**Moltbot Agent** (tool: delegate_to_moltbot)
- Use for: general queries, Moltbot-specific features, messaging, scheduling
- Capabilities: All Moltbot tools (message, cron, browser, canvas, etc.)
- When to use: User asks about Moltbot, messaging, or general assistance`);
}
return `You are Moltbot's Orchestrator. Your role is to understand user requests and route them to the most appropriate specialized agent(s).
## Available Agents
${agents.join("\n")}
## Decision Rules
1. **Analyze the request**: Understand what the user is asking for
2. **Select agent(s)**: Choose the best agent(s) for the task
3. **Parallel execution**: You can call multiple agents simultaneously if the task benefits from it
4. **Direct response**: For simple greetings or trivial questions, respond directly without delegation
## Response Format
- If delegating: Call the appropriate delegation tool(s)
- If handling directly: Provide a brief, helpful response
- If multiple agents needed: Call multiple tools - they will run in parallel
## Examples
User: "Write a Python script to parse JSON"
Action: delegate_to_opencode
User: "What's the weather in Tokyo?"
Action: delegate_to_research
User: "Send a message to John saying hello"
Action: delegate_to_moltbot
User: "Research React best practices and create a component"
Action: delegate_to_research AND delegate_to_opencode (parallel)
User: "Hello!"
Action: Respond directly with a greeting`;
}
```
## Result Aggregation
```typescript
// result-aggregator.ts
export function aggregateResults(results: AgentResult[]): OrchestratorResponse {
const payloads: Array<{ text: string; isError?: boolean }> = [];
for (const result of results) {
if (result.status === "error") {
payloads.push({
text: `**${result.agentName} Error:**\n${result.errorMessage || result.output}`,
isError: true,
});
} else {
payloads.push({
text: results.length > 1
? `**${result.agentName}:**\n${result.output}`
: result.output,
});
}
}
return {
payloads,
agentResults: results,
meta: {
durationMs: Math.max(...results.map(r => r.durationMs)),
orchestratorModel: "configured-model",
delegatedAgents: results.map(r => r.agentType),
},
};
}
```
## Migration Notes
- Orchestrator is **opt-in** via `orchestrator.enabled: true`
- Default behavior (orchestrator disabled) remains unchanged
- Existing `agents.defaults.model` config is preserved
- Orchestrator model is separate from agent models
## Future Enhancements
1. **OpenCode serve mode**: Persistent coding sessions with state
2. **Agent chaining**: Sequential agent execution with context passing
3. **Custom agents**: Plugin system for adding new agent types
4. **Learning**: Track which delegations work best and improve routing
5. **Streaming**: Real-time output from delegated agents
## References
- OpenCode CLI: https://github.com/anomalyco/opencode
- Moltbot Agent Architecture: `src/agents/pi-embedded-runner/`
- Existing Subagent Pattern: `src/agents/tools/sessions-spawn-tool.ts`

View File

@ -0,0 +1,34 @@
import type { MoltbotConfig } from "../../config/config.js";
import type { OrchestratorConfig } from "../../config/types.orchestrator.js";
export const ORCHESTRATOR_DEFAULT_MODEL = "google-gemini-cli/gemini-3-flash-preview";
export function resolveOrchestratorConfig(config?: MoltbotConfig): OrchestratorConfig {
const orchestrator = config?.orchestrator ?? {};
return {
enabled: orchestrator.enabled ?? false,
model: orchestrator.model ?? ORCHESTRATOR_DEFAULT_MODEL,
fallbacks: orchestrator.fallbacks ?? [],
agents: {
opencode: {
enabled: orchestrator.agents?.opencode?.enabled ?? true,
model: orchestrator.agents?.opencode?.model,
mode: orchestrator.agents?.opencode?.mode ?? "cli",
binary: orchestrator.agents?.opencode?.binary ?? "opencode",
timeoutMs: orchestrator.agents?.opencode?.timeoutMs ?? 300000,
servePort: orchestrator.agents?.opencode?.servePort ?? 4096,
},
research: {
enabled: orchestrator.agents?.research?.enabled ?? true,
model: orchestrator.agents?.research?.model,
},
embedded: {
enabled: orchestrator.agents?.embedded?.enabled ?? true,
model: orchestrator.agents?.embedded?.model,
},
},
parallelExecution: orchestrator.parallelExecution ?? true,
maxParallelAgents: orchestrator.maxParallelAgents ?? 3,
};
}

View File

@ -0,0 +1,59 @@
import { runEmbeddedPiAgent } from "../../pi-embedded-runner/run.js";
import type { AgentResult, EmbeddedAgentConfig } from "../types.js";
import type { MoltbotConfig } from "../../../config/config.js";
import { resolveSessionTranscriptPath } from "../../../config/sessions/paths.js";
export async function delegateToEmbeddedAgent(
message: string,
options: EmbeddedAgentConfig,
config?: MoltbotConfig,
sessionId?: string,
sessionKey?: string,
): Promise<AgentResult> {
const started = Date.now();
const effectiveSessionId = sessionId ?? `orchestrator-sub-${Date.now()}`;
try {
const result = await runEmbeddedPiAgent({
sessionId: effectiveSessionId,
sessionKey,
sessionFile: resolveSessionTranscriptPath(effectiveSessionId),
workspaceDir: config?.agents?.defaults?.workspace ?? process.cwd(),
config,
prompt: message,
timeoutMs: 300000,
runId: `orchestrator-${Date.now()}`,
provider: options.model ? options.model.split("/")[0] : undefined,
model: options.model ? options.model.split("/").slice(1).join("/") : undefined,
disableTools: false, // Allow embedded agent to use its tools
});
const output = (result.payloads ?? [])
.filter((p): p is { text: string } => !p.isError && typeof p.text === "string")
.map((p) => p.text)
.join("\n")
.trim();
const errorPayload = (result.payloads ?? []).find((p) => p.isError);
return {
agentName: "Moltbot",
agentType: "embedded",
status: errorPayload ? "error" : "success",
output,
errorMessage: errorPayload?.text,
durationMs: Date.now() - started,
model: options.model ?? undefined,
};
} catch (error: any) {
return {
agentName: "Moltbot",
agentType: "embedded",
status: "error",
output: "",
errorMessage: error.message || "Failed to execute embedded agent",
durationMs: Date.now() - started,
model: options.model ?? undefined,
};
}
}

View File

@ -0,0 +1,3 @@
export { delegateToOpenCodeAgent } from "./opencode-agent.js";
export { delegateToResearchAgent } from "./research-agent.js";
export { delegateToEmbeddedAgent } from "./embedded-agent.js";

View File

@ -0,0 +1,41 @@
import { runCommandWithTimeout } from "../../../process/exec.js";
import type { AgentResult, OpenCodeAgentConfig } from "../types.js";
export async function delegateToOpenCodeAgent(
message: string,
options: OpenCodeAgentConfig,
workspaceDir: string,
): Promise<AgentResult> {
const started = Date.now();
const args = ["run", message];
if (options.model) {
args.push("--model", options.model);
}
try {
const result = await runCommandWithTimeout([options.binary ?? "opencode", ...args], {
timeoutMs: options.timeoutMs ?? 300000,
cwd: workspaceDir,
});
return {
agentName: "OpenCode",
agentType: "opencode",
status: result.code === 0 ? "success" : "error",
output: result.stdout.trim(),
errorMessage: result.code !== 0 ? result.stderr.trim() || "OpenCode failed." : undefined,
durationMs: Date.now() - started,
model: options.model,
};
} catch (error: any) {
return {
agentName: "OpenCode",
agentType: "opencode",
status: "error",
output: "",
errorMessage: error.message || "Failed to execute OpenCode CLI",
durationMs: Date.now() - started,
model: options.model,
};
}
}

View File

@ -0,0 +1,56 @@
import { createWebSearchTool } from "../../tools/web-search.js";
import type { AgentResult, ResearchAgentConfig } from "../types.js";
import type { MoltbotConfig } from "../../../config/config.js";
export async function delegateToResearchAgent(
query: string,
options: ResearchAgentConfig,
config?: MoltbotConfig,
): Promise<AgentResult> {
const started = Date.now();
try {
const searchTool = createWebSearchTool({ config });
if (!searchTool) {
throw new Error("Web search tool not available or not configured.");
}
const result = await searchTool.execute("orchestrator-research", { query });
const firstContent = result.content[0];
if (!firstContent || firstContent.type !== "text") {
throw new Error("Research tool returned unexpected content type.");
}
const data = JSON.parse(firstContent.text);
let output = "";
if (data.results) {
output = data.results
.map((r: any) => `**${r.title}**\n${r.url}\n${r.description}`)
.join("\n\n");
} else if (data.content) {
output = data.content;
} else {
output = JSON.stringify(data, null, 2);
}
return {
agentName: "Research",
agentType: "research",
status: data.error ? "error" : "success",
output,
errorMessage: data.error ? data.message || data.error : undefined,
durationMs: Date.now() - started,
model: options.model,
};
} catch (error: any) {
return {
agentName: "Research",
agentType: "research",
status: "error",
output: "",
errorMessage: error.message || "Failed to execute research",
durationMs: Date.now() - started,
model: options.model,
};
}
}

View File

@ -0,0 +1,99 @@
import { Type } from "@sinclair/typebox";
import type { AnyAgentTool } from "../../tools/common.js";
import {
delegateToOpenCodeAgent,
delegateToResearchAgent,
delegateToEmbeddedAgent,
} from "./index.js";
import type { OrchestratorConfig } from "../types.js";
import type { MoltbotConfig } from "../../../config/config.js";
export function createDelegationTools(params: {
config: MoltbotConfig;
orchestratorConfig: OrchestratorConfig;
sessionId: string;
sessionKey?: string;
onAgentResult: (result: any) => void;
}): AnyAgentTool[] {
const { config, orchestratorConfig, sessionId, sessionKey, onAgentResult } = params;
const tools: AnyAgentTool[] = [];
if (orchestratorConfig.agents?.opencode?.enabled !== false) {
tools.push({
name: "delegate_to_opencode",
label: "OpenCode",
description: "Delegate a coding, debugging, or file modification task to the OpenCode agent.",
parameters: Type.Object({
task: Type.String({ description: "The specific coding task to perform." }),
}),
execute: async (_id, args) => {
const result = await delegateToOpenCodeAgent(
String(args.task),
orchestratorConfig.agents!.opencode!,
config.agents?.defaults?.workspace ?? process.cwd(),
);
onAgentResult(result);
return {
content: [
{ type: "text", text: result.output || result.errorMessage || "Task completed." },
],
details: result,
};
},
});
}
if (orchestratorConfig.agents?.research?.enabled !== false) {
tools.push({
name: "delegate_to_research",
label: "Research",
description: "Delegate an information retrieval or web search task to the Research agent.",
parameters: Type.Object({
query: Type.String({ description: "The search query or research topic." }),
}),
execute: async (_id, args) => {
const result = await delegateToResearchAgent(
String(args.query),
orchestratorConfig.agents!.research!,
config,
);
onAgentResult(result);
return {
content: [
{ type: "text", text: result.output || result.errorMessage || "Research completed." },
],
details: result,
};
},
});
}
if (orchestratorConfig.agents?.embedded?.enabled !== false) {
tools.push({
name: "delegate_to_moltbot",
label: "Moltbot",
description: "Delegate a general query or Moltbot-specific task to the main Moltbot agent.",
parameters: Type.Object({
message: Type.String({ description: "The message or task for the Moltbot agent." }),
}),
execute: async (_id, args) => {
const result = await delegateToEmbeddedAgent(
String(args.message),
orchestratorConfig.agents!.embedded!,
config,
sessionId,
sessionKey,
);
onAgentResult(result);
return {
content: [
{ type: "text", text: result.output || result.errorMessage || "Task completed." },
],
details: result,
};
},
});
}
return tools;
}

View File

@ -0,0 +1,6 @@
export * from "./types.js";
export * from "./config.js";
export * from "./orchestrator.js";
export * from "./prompt.js";
export * from "./result-aggregator.js";
export * from "./delegation-tools/index.js";

View File

@ -0,0 +1,68 @@
import { describe, expect, it, vi } from "vitest";
import { orchestrateUserMessage } from "./orchestrator.js";
import { ORCHESTRATOR_DEFAULT_MODEL } from "./config.js";
vi.mock("../pi-embedded-runner/run.js", () => ({
runEmbeddedPiAgent: vi.fn(async (params) => {
if (params.sessionId.startsWith("orch-")) {
return {
payloads: [{ text: "I'll help you with that." }],
meta: {
durationMs: 100,
agentMeta: {
sessionId: params.sessionId,
provider: params.provider,
model: params.model,
},
},
};
}
return {
payloads: [{ text: "Sub-agent result" }],
meta: {
durationMs: 200,
agentMeta: {
sessionId: params.sessionId,
provider: params.provider,
model: params.model,
},
},
};
}),
}));
describe("Orchestrator", () => {
it("should handle direct response when no tools called", async () => {
const result = await orchestrateUserMessage({
userMessage: "hello",
sessionId: "test-session",
config: {
orchestrator: {
enabled: true,
model: ORCHESTRATOR_DEFAULT_MODEL,
},
} as any,
});
expect(result.agentResults).toHaveLength(1);
expect(result.agentResults[0].agentName).toBe("Orchestrator");
expect(result.agentResults[0].output).toBe("I'll help you with that.");
});
it("should fall back to direct embedded run if disabled", async () => {
const result = await orchestrateUserMessage({
userMessage: "hello",
sessionId: "test-session",
config: {
orchestrator: {
enabled: false,
},
} as any,
});
expect(result.agentResults).toHaveLength(1);
expect(result.agentResults[0].agentName).toBe("Moltbot");
expect(result.agentResults[0].agentType).toBe("embedded");
expect(result.agentResults[0].output).toBe("Sub-agent result");
});
});

View File

@ -0,0 +1,108 @@
import { runEmbeddedPiAgent } from "../pi-embedded-runner/run.js";
import { resolveOrchestratorConfig } from "./config.js";
import { buildOrchestratorSystemPrompt } from "./prompt.js";
import { createDelegationTools } from "./delegation-tools/tools.js";
import { aggregateResults } from "./result-aggregator.js";
import type { OrchestratorParams, OrchestratorResponse, AgentResult } from "./types.js";
import { resolveSessionTranscriptPath } from "../../config/sessions/paths.js";
export async function orchestrateUserMessage(
params: OrchestratorParams,
): Promise<OrchestratorResponse> {
const started = Date.now();
const orchestratorConfig = resolveOrchestratorConfig(params.config);
if (!orchestratorConfig.enabled) {
const result = await runEmbeddedPiAgent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: resolveSessionTranscriptPath(params.sessionId),
workspaceDir: params.config?.agents?.defaults?.workspace ?? process.cwd(),
config: params.config,
prompt: params.userMessage,
timeoutMs: params.timeoutMs ?? 300000,
runId: `direct-${Date.now()}`,
images: params.images,
thinkLevel: params.thinking,
});
const output = (result.payloads ?? [])
.filter((p): p is { text: string } => !p.isError && typeof p.text === "string")
.map((p) => p.text)
.join("\n")
.trim();
const errorPayload = (result.payloads ?? []).find((p) => p.isError);
return {
payloads: (result.payloads ?? []).map((p) => ({ text: p.text ?? "", isError: p.isError })),
agentResults: [
{
agentName: "Moltbot",
agentType: "embedded",
status: errorPayload ? "error" : "success",
output,
errorMessage: errorPayload?.text,
durationMs: Date.now() - started,
},
],
meta: {
durationMs: Date.now() - started,
orchestratorModel: "disabled",
delegatedAgents: ["embedded"],
},
};
}
const agentResults: AgentResult[] = [];
const onAgentResult = (result: AgentResult) => {
agentResults.push(result);
};
const tools = createDelegationTools({
config: params.config!,
orchestratorConfig,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
onAgentResult,
});
const systemPrompt = buildOrchestratorSystemPrompt(orchestratorConfig);
const modelRef = orchestratorConfig.model || "google/gemini-3-flash-preview";
const orchestratorRunResult = await runEmbeddedPiAgent({
sessionId: `orch-${params.sessionId}`,
sessionKey: params.sessionKey ? `orch-${params.sessionKey}` : undefined,
sessionFile: resolveSessionTranscriptPath(`orch-${params.sessionId}`),
workspaceDir: params.config?.agents?.defaults?.workspace ?? process.cwd(),
config: params.config,
prompt: params.userMessage,
timeoutMs: params.timeoutMs ?? 300000,
runId: `orch-run-${Date.now()}`,
provider: modelRef.split("/")[0],
model: modelRef.split("/").slice(1).join("/"),
extraSystemPrompt: systemPrompt,
extraTools: tools,
disableTools: false,
images: params.images,
});
const orchestratorText = (orchestratorRunResult.payloads ?? [])
.filter((p): p is { text: string } => !p.isError && typeof p.text === "string")
.map((p) => p.text)
.join("\n")
.trim();
if (orchestratorText && agentResults.length === 0) {
agentResults.push({
agentName: "Orchestrator",
agentType: "orchestrator",
status: "success",
output: orchestratorText,
durationMs: Date.now() - started,
model: modelRef,
});
}
return aggregateResults(agentResults, modelRef);
}

View File

@ -0,0 +1,64 @@
import type { OrchestratorConfig } from "../../config/types.orchestrator.js";
export function buildOrchestratorSystemPrompt(config: OrchestratorConfig): string {
const agents = [];
if (config.agents?.opencode?.enabled !== false) {
agents.push(`
**OpenCode Agent** (tool: delegate_to_opencode)
- Use for: coding tasks, file modifications, debugging, shell commands, git operations
- Capabilities: Full IDE-like agent with bash, file editing, LSP, code search
- When to use: User asks to write code, modify files, run commands, debug issues`);
}
if (config.agents?.research?.enabled !== false) {
agents.push(`
**Research Agent** (tool: delegate_to_research)
- Use for: web searches, information retrieval, fact-checking
- Capabilities: Web search, URL fetching, documentation lookup
- When to use: User needs current information, documentation, or research`);
}
if (config.agents?.embedded?.enabled !== false) {
agents.push(`
**Moltbot Agent** (tool: delegate_to_moltbot)
- Use for: general queries, Moltbot-specific features, messaging, scheduling
- Capabilities: All Moltbot tools (message, cron, browser, canvas, etc.)
- When to use: User asks about Moltbot, messaging, or general assistance`);
}
return `You are Moltbot's Orchestrator. Your role is to understand user requests and route them to the most appropriate specialized agent(s).
## Available Agents
${agents.join("\n")}
## Decision Rules
1. **Analyze the request**: Understand what the user is asking for
2. **Select agent(s)**: Choose the best agent(s) for the task
3. **Parallel execution**: You can call multiple agents simultaneously if the task benefits from it.
4. **Direct response**: For simple greetings or trivial questions, respond directly without delegation.
## Response Format
- If delegating: Call the appropriate delegation tool(s).
- If handling directly: Provide a brief, helpful response.
- If multiple agents needed: Call multiple tools - they will run in parallel.
## Examples
User: "Write a Python script to parse JSON"
Action: delegate_to_opencode
User: "What's the weather in Tokyo?"
Action: delegate_to_research
User: "Send a message to John saying hello"
Action: delegate_to_moltbot
User: "Research React best practices and create a component"
Action: delegate_to_research AND delegate_to_opencode (parallel)
User: "Hello!"
Action: Respond directly with a greeting`;
}

View File

@ -0,0 +1,31 @@
import type { AgentResult, OrchestratorResponse } from "./types.js";
export function aggregateResults(
results: AgentResult[],
orchestratorModel: string,
): OrchestratorResponse {
const payloads: Array<{ text: string; isError?: boolean }> = [];
for (const result of results) {
if (result.status === "error") {
payloads.push({
text: `**${result.agentName} Error:**\n${result.errorMessage || result.output}`,
isError: true,
});
} else {
payloads.push({
text: results.length > 1 ? `**${result.agentName}:**\n${result.output}` : result.output,
});
}
}
return {
payloads,
agentResults: results,
meta: {
durationMs: results.length > 0 ? Math.max(...results.map((r) => r.durationMs)) : 0,
orchestratorModel,
delegatedAgents: results.map((r) => r.agentType),
},
};
}

View File

@ -0,0 +1,46 @@
import type { ImageContent } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { MoltbotConfig } from "../../config/config.js";
import type { OrchestratorConfig } from "../../config/types.orchestrator.js";
export type {
OrchestratorConfig,
OpenCodeAgentConfig,
ResearchAgentConfig,
EmbeddedAgentConfig,
} from "../../config/types.orchestrator.js";
export interface AgentResult {
agentName: string;
agentType: "opencode" | "research" | "embedded" | "orchestrator";
status: "success" | "error" | "partial" | "timeout";
output: string;
artifacts?: unknown[];
errorMessage?: string;
durationMs: number;
model?: string;
tokenUsage?: {
input: number;
output: number;
};
}
export interface OrchestratorParams {
userMessage: string;
sessionId: string;
sessionKey?: string;
config?: MoltbotConfig;
images?: ImageContent[];
timeoutMs?: number;
thinking?: ThinkLevel;
}
export interface OrchestratorResponse {
payloads: Array<{ text: string; isError?: boolean }>;
agentResults: AgentResult[];
meta: {
durationMs: number;
orchestratorModel: string;
delegatedAgents: string[];
};
}

View File

@ -351,6 +351,7 @@ export async function runEmbeddedPiAgent(
onToolResult: params.onToolResult,
onAgentEvent: params.onAgentEvent,
extraSystemPrompt: params.extraSystemPrompt,
extraTools: params.extraTools,
streamParams: params.streamParams,
ownerNumbers: params.ownerNumbers,
enforceFinalTag: params.enforceFinalTag,

View File

@ -433,7 +433,7 @@ export async function runEmbeddedAttempt(
});
const { builtInTools, customTools } = splitSdkTools({
tools,
tools: [...tools, ...(params.extraTools ?? [])],
sandboxEnabled: !!sandbox?.enabled,
});

View File

@ -6,6 +6,7 @@ import type { enqueueCommand } from "../../../process/command-queue.js";
import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js";
import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js";
import type { SkillSnapshot } from "../../skills.js";
import type { AnyAgentTool } from "../../pi-tools.types.js";
// Simplified tool definition for client-provided tools (OpenResponses hosted tools)
export type ClientToolDefinition = {
@ -56,6 +57,7 @@ export type RunEmbeddedPiAgentParams = {
images?: ImageContent[];
/** Optional client-provided tools (OpenResponses hosted tools). */
clientTools?: ClientToolDefinition[];
extraTools?: AnyAgentTool[];
/** Disable built-in tools for this run (LLM-only mode). */
disableTools?: boolean;
provider?: string;

View File

@ -9,6 +9,7 @@ import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js
import type { MessagingToolSend } from "../../pi-embedded-messaging.js";
import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js";
import type { SkillSnapshot } from "../../skills.js";
import type { AnyAgentTool } from "../../pi-tools.types.js";
import type { SessionSystemPromptReport } from "../../../config/sessions/types.js";
import type { ClientToolDefinition } from "./params.js";
@ -48,6 +49,7 @@ export type EmbeddedRunAttemptParams = {
images?: ImageContent[];
/** Optional client-provided tools (OpenResponses hosted tools). */
clientTools?: ClientToolDefinition[];
extraTools?: AnyAgentTool[];
/** Disable built-in tools for this run (LLM-only mode). */
disableTools?: boolean;
provider: string;

View File

@ -6,6 +6,10 @@ import { getCliSessionId } from "../../agents/cli-session.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import {
orchestrateUserMessage,
resolveOrchestratorConfig,
} from "../../agents/orchestrator/index.js";
import {
isCompactionFailureError,
isContextOverflowError,
@ -97,6 +101,11 @@ export async function runAgentTurnWithFallback(params: {
let fallbackModel = params.followupRun.run.model;
let didResetAfterCompactionFailure = false;
const orchestratorConfig = resolveOrchestratorConfig(params.followupRun.run.config);
const useOrchestrator = orchestratorConfig.enabled && !params.opts?.noOrchestrator;
const sessionAgentId = resolveAgentIdFromSessionKey(params.sessionKey);
while (true) {
try {
const allowPartialStream = !(
@ -134,53 +143,75 @@ export async function runAgentTurnWithFallback(params: {
};
const blockReplyPipeline = params.blockReplyPipeline;
const onToolResult = params.opts?.onToolResult;
const fallbackResult = await runWithModelFallback({
cfg: params.followupRun.run.config,
provider: params.followupRun.run.provider,
model: params.followupRun.run.model,
agentDir: params.followupRun.run.agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(
params.followupRun.run.config,
resolveAgentIdFromSessionKey(params.followupRun.run.sessionKey),
),
run: (provider, model) => {
// Notify that model selection is complete (including after fallback).
// This allows responsePrefix template interpolation with the actual model.
params.opts?.onModelSelected?.({
provider,
model,
thinkLevel: params.followupRun.run.thinkLevel,
});
if (isCliProvider(provider, params.followupRun.run.config)) {
const startedAt = Date.now();
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "start",
startedAt,
},
});
const cliSessionId = getCliSessionId(params.getActiveSessionEntry(), provider);
return runCliAgent({
if (useOrchestrator && !didResetAfterCompactionFailure) {
const orchResult = await orchestrateUserMessage({
userMessage: params.commandBody,
sessionId: params.followupRun.run.sessionId,
sessionKey: params.sessionKey,
config: params.followupRun.run.config,
images: params.opts?.images,
timeoutMs: params.followupRun.run.timeoutMs,
thinking: params.followupRun.run.thinkLevel,
});
runResult = {
payloads: orchResult.payloads,
meta: {
durationMs: orchResult.meta.durationMs,
agentMeta: {
sessionId: params.followupRun.run.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.followupRun.run.sessionFile,
workspaceDir: params.followupRun.run.workspaceDir,
config: params.followupRun.run.config,
prompt: params.commandBody,
provider: "orchestrator",
model: orchResult.meta.orchestratorModel,
},
},
};
} else {
const fallbackResult = await runWithModelFallback({
cfg: params.followupRun.run.config,
provider: params.followupRun.run.provider,
model: params.followupRun.run.model,
agentDir: params.followupRun.run.agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(
params.followupRun.run.config,
sessionAgentId,
),
run: async (provider, model) => {
// Notify that model selection is complete (including after fallback).
// This allows responsePrefix template interpolation with the actual model.
params.opts?.onModelSelected?.({
provider,
model,
thinkLevel: params.followupRun.run.thinkLevel,
timeoutMs: params.followupRun.run.timeoutMs,
runId,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
ownerNumbers: params.followupRun.run.ownerNumbers,
cliSessionId,
images: params.opts?.images,
})
.then((result) => {
});
if (isCliProvider(provider, params.followupRun.run.config)) {
const startedAt = Date.now();
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "start",
startedAt,
},
});
const cliSessionId = getCliSessionId(params.getActiveSessionEntry(), provider);
return runCliAgent({
sessionId: params.followupRun.run.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.followupRun.run.sessionFile,
workspaceDir: params.followupRun.run.workspaceDir,
config: params.followupRun.run.config,
prompt: params.commandBody,
provider,
model,
thinkLevel: params.followupRun.run.thinkLevel,
timeoutMs: params.followupRun.run.timeoutMs,
runId,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
ownerNumbers: params.followupRun.run.ownerNumbers,
cliSessionId,
images: params.opts?.images,
}).then((result) => {
// CLI backends don't emit streaming assistant events, so we need to
// emit one with the final text so server-chat can populate its buffer
// and send the response to TUI/WebSocket clients.
@ -198,229 +229,219 @@ export async function runAgentTurnWithFallback(params: {
data: {
phase: "end",
startedAt,
endedAt: Date.now(),
durationMs: Date.now() - startedAt,
},
});
return result;
})
.catch((err) => {
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt,
endedAt: Date.now(),
error: err instanceof Error ? err.message : String(err),
},
});
throw err;
});
}
const authProfileId =
provider === params.followupRun.run.provider
? params.followupRun.run.authProfileId
: undefined;
return runEmbeddedPiAgent({
sessionId: params.followupRun.run.sessionId,
sessionKey: params.sessionKey,
messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined,
agentAccountId: params.sessionCtx.AccountId,
messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To,
messageThreadId: params.sessionCtx.MessageThreadId ?? undefined,
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
senderId: params.sessionCtx.SenderId?.trim() || undefined,
senderName: params.sessionCtx.SenderName?.trim() || undefined,
senderUsername: params.sessionCtx.SenderUsername?.trim() || undefined,
senderE164: params.sessionCtx.SenderE164?.trim() || undefined,
// Provider threading context for tool auto-injection
...buildThreadingToolContext({
sessionCtx: params.sessionCtx,
}
const authProfileId =
provider === params.followupRun.run.provider
? params.getActiveSessionEntry()?.authProfileOverride
: undefined;
return runEmbeddedPiAgent({
sessionId: params.followupRun.run.sessionId,
sessionKey: params.sessionKey,
messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined,
agentAccountId: params.sessionCtx.AccountId,
messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To,
messageThreadId: params.sessionCtx.MessageThreadId ?? undefined,
groupId: resolveGroupSessionKey(params.sessionCtx)?.id,
groupChannel:
params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(),
groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined,
senderId: params.sessionCtx.SenderId?.trim() || undefined,
senderName: params.sessionCtx.SenderName?.trim() || undefined,
senderUsername: params.sessionCtx.SenderUsername?.trim() || undefined,
senderE164: params.sessionCtx.SenderE164?.trim() || undefined,
// Provider threading context for tool auto-injection
...buildThreadingToolContext({
sessionCtx: params.sessionCtx,
config: params.followupRun.run.config,
hasRepliedRef: params.opts?.hasRepliedRef,
}),
sessionFile: params.followupRun.run.sessionFile,
workspaceDir: params.followupRun.run.workspaceDir,
agentDir: params.followupRun.run.agentDir,
config: params.followupRun.run.config,
hasRepliedRef: params.opts?.hasRepliedRef,
}),
sessionFile: params.followupRun.run.sessionFile,
workspaceDir: params.followupRun.run.workspaceDir,
agentDir: params.followupRun.run.agentDir,
config: params.followupRun.run.config,
skillsSnapshot: params.followupRun.run.skillsSnapshot,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
ownerNumbers: params.followupRun.run.ownerNumbers,
enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider),
provider,
model,
authProfileId,
authProfileIdSource: authProfileId
? params.followupRun.run.authProfileIdSource
: undefined,
thinkLevel: params.followupRun.run.thinkLevel,
verboseLevel: params.followupRun.run.verboseLevel,
reasoningLevel: params.followupRun.run.reasoningLevel,
execOverrides: params.followupRun.run.execOverrides,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) return "markdown";
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
bashElevated: params.followupRun.run.bashElevated,
timeoutMs: params.followupRun.run.timeoutMs,
runId,
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: allowPartialStream
? async (payload) => {
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) return;
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
skillsSnapshot: params.followupRun.run.skillsSnapshot,
prompt: params.commandBody,
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
ownerNumbers: params.followupRun.run.ownerNumbers,
enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider),
provider,
model,
authProfileId,
authProfileIdSource: authProfileId
? params.followupRun.run.authProfileIdSource
: undefined,
thinkLevel: params.followupRun.run.thinkLevel,
verboseLevel: params.followupRun.run.verboseLevel,
reasoningLevel: params.followupRun.run.reasoningLevel,
execOverrides: params.followupRun.run.execOverrides,
toolResultFormat: (() => {
const channel = resolveMessageChannel(
params.sessionCtx.Surface,
params.sessionCtx.Provider,
);
if (!channel) return "markdown";
return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain";
})(),
bashElevated: params.followupRun.run.bashElevated,
timeoutMs: params.followupRun.run.timeoutMs,
runId,
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: allowPartialStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
const textForTyping = await handlePartialForTyping(payload);
if (!params.opts?.onPartialReply || textForTyping === undefined) return;
await params.opts.onPartialReply({
text: textForTyping,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onAgentEvent: async (evt) => {
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
}
}
// Track auto-compaction completion
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
}
},
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? async (payload) => {
const { text, skip } = normalizeStreamingText(payload);
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasPayloadMedia) return;
const currentMessageId =
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId: payload.replyToId,
replyToTag: payload.replyToTag,
replyToCurrent: payload.replyToCurrent,
},
currentMessageId,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
});
const cleaned = parsed.text || undefined;
const hasRenderableMedia =
Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasRenderableMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
)
return;
if (parsed.isSilent && !hasRenderableMedia) return;
const blockPayload: ReplyPayload = params.applyReplyToMode({
...taggedPayload,
text: cleaned,
audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice),
replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag,
replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent,
});
void params.typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
// Use pipeline if available (block streaming enabled), otherwise send directly
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload);
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.opts?.onBlockReply?.(blockPayload);
onAssistantMessageStart: async () => {
await params.typingSignals.signalMessageStart();
},
onReasoningStream:
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
? async (payload) => {
await params.typingSignals.signalReasoningDelta();
await params.opts?.onReasoningStream?.({
text: payload.text,
mediaUrls: payload.mediaUrls,
});
}
: undefined,
onAgentEvent: async (evt) => {
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
}
// When streaming is disabled entirely, blocks are accumulated in final text instead.
}
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
// Track auto-compaction completion
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: onToolResult
? (payload) => {
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
const task = (async () => {
}
},
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? async (payload) => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) return;
await params.typingSignals.signalTextDelta(text);
await onToolResult({
text,
mediaUrls: payload.mediaUrls,
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasPayloadMedia) return;
const currentMessageId =
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId: payload.replyToId,
replyToTag: payload.replyToTag,
replyToCurrent: payload.replyToCurrent,
},
currentMessageId,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
});
})()
.catch((err) => {
logVerbose(`tool result delivery failed: ${String(err)}`);
})
.finally(() => {
params.pendingToolTasks.delete(task);
const cleaned = parsed.text || undefined;
const hasRenderableMedia =
Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasRenderableMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
)
return;
if (parsed.isSilent && !hasRenderableMedia) return;
const blockPayload: ReplyPayload = params.applyReplyToMode({
...taggedPayload,
text: cleaned,
audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice),
replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag,
replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent,
});
params.pendingToolTasks.add(task);
}
: undefined,
});
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
void params.typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
// Use pipeline if available (block streaming enabled), otherwise send directly
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload);
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.opts?.onBlockReply?.(blockPayload);
}
// When streaming is disabled entirely, blocks are accumulated in final text instead.
}
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: onToolResult
? (payload) => {
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
const task = (async () => {
const { text, skip } = normalizeStreamingText(payload);
if (skip) return;
await params.typingSignals.signalTextDelta(text);
await onToolResult({
text,
mediaUrls: payload.mediaUrls,
});
})()
.catch((err) => {
logVerbose(`tool result delivery failed: ${String(err)}`);
})
.finally(() => {
params.pendingToolTasks.delete(task);
});
params.pendingToolTasks.add(task);
}
: undefined,
});
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
}
// Some embedded runs surface context overflow as an error payload instead of throwing.
// Treat those as a session-level failure and auto-recover by starting a fresh session.

View File

@ -39,6 +39,7 @@ export type GetReplyOptions = {
skillFilter?: string[];
/** Mutable ref to track if a reply was sent (for Slack "first" threading mode). */
hasRepliedRef?: { value: boolean };
noOrchestrator?: boolean;
};
export type ReplyPayload = {

View File

@ -45,6 +45,8 @@ export function registerAgentCommands(program: Command, args: { agentChannelOpti
"--timeout <seconds>",
"Override agent command timeout (seconds, default 600 or config value)",
)
.option("--orchestrator", "Force enable the orchestrator for this run", false)
.option("--no-orchestrator", "Explicitly disable the orchestrator for this run", false)
.addHelpText(
"after",
() =>

View File

@ -49,6 +49,8 @@ export type AgentCliOpts = {
lane?: string;
runId?: string;
extraSystemPrompt?: string;
orchestrator?: boolean;
noOrchestrator?: boolean;
local?: boolean;
};
@ -134,6 +136,8 @@ export async function agentViaGatewayCommand(opts: AgentCliOpts, runtime: Runtim
timeout: timeoutSeconds,
lane: opts.lane,
extraSystemPrompt: opts.extraSystemPrompt,
orchestrator: opts.orchestrator,
noOrchestrator: opts.noOrchestrator,
idempotencyKey,
},
expectFinal: true,

View File

@ -19,6 +19,7 @@ import {
resolveThinkingDefault,
} from "../agents/model-selection.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { orchestrateUserMessage, resolveOrchestratorConfig } from "../agents/orchestrator/index.js";
import { buildWorkspaceSkillSnapshot } from "../agents/skills.js";
import { getSkillsSnapshotVersion } from "../agents/skills/refresh.js";
import { resolveAgentTimeoutMs } from "../agents/timeout.js";
@ -371,6 +372,11 @@ export async function agentCommand(
let result: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = provider;
let fallbackModel = model;
const orchestratorConfig = resolveOrchestratorConfig(cfg);
const useOrchestrator =
(orchestratorConfig.enabled || opts.orchestrator) && !opts.noOrchestrator;
try {
const runContext = resolveAgentRunContext(opts);
const messageChannel = resolveMessageChannel(
@ -378,88 +384,112 @@ export async function agentCommand(
opts.replyChannel ?? opts.channel,
);
const spawnedBy = opts.spawnedBy ?? sessionEntry?.spawnedBy;
const fallbackResult = await runWithModelFallback({
cfg,
provider,
model,
agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(cfg, sessionAgentId),
run: (providerOverride, modelOverride) => {
if (isCliProvider(providerOverride, cfg)) {
const cliSessionId = getCliSessionId(sessionEntry, providerOverride);
return runCliAgent({
if (useOrchestrator) {
const orchResult = await orchestrateUserMessage({
userMessage: body,
sessionId,
sessionKey,
config: cfg,
images: opts.images,
timeoutMs,
thinking: resolvedThinkLevel,
});
result = {
payloads: orchResult.payloads,
meta: {
durationMs: orchResult.meta.durationMs,
agentMeta: {
sessionId,
provider: "orchestrator",
model: orchResult.meta.orchestratorModel,
},
},
};
} else {
const fallbackResult = await runWithModelFallback({
cfg,
provider,
model,
agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(cfg, sessionAgentId),
run: (providerOverride, modelOverride) => {
if (isCliProvider(providerOverride, cfg)) {
const cliSessionId = getCliSessionId(sessionEntry, providerOverride);
return runCliAgent({
sessionId,
sessionKey,
sessionFile,
workspaceDir,
config: cfg,
prompt: body,
provider: providerOverride,
model: modelOverride,
thinkLevel: resolvedThinkLevel,
timeoutMs,
runId,
extraSystemPrompt: opts.extraSystemPrompt,
cliSessionId,
images: opts.images,
streamParams: opts.streamParams,
});
}
const authProfileId =
providerOverride === provider ? sessionEntry?.authProfileOverride : undefined;
return runEmbeddedPiAgent({
sessionId,
sessionKey,
messageChannel,
agentAccountId: runContext.accountId,
messageTo: opts.replyTo ?? opts.to,
messageThreadId: opts.threadId,
groupId: runContext.groupId,
groupChannel: runContext.groupChannel,
groupSpace: runContext.groupSpace,
spawnedBy,
currentChannelId: runContext.currentChannelId,
currentThreadTs: runContext.currentThreadTs,
replyToMode: runContext.replyToMode,
hasRepliedRef: runContext.hasRepliedRef,
sessionFile,
workspaceDir,
config: cfg,
skillsSnapshot,
prompt: body,
images: opts.images,
clientTools: opts.clientTools,
provider: providerOverride,
model: modelOverride,
authProfileId,
authProfileIdSource: authProfileId
? sessionEntry?.authProfileOverrideSource
: undefined,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
timeoutMs,
runId,
lane: opts.lane,
abortSignal: opts.abortSignal,
extraSystemPrompt: opts.extraSystemPrompt,
cliSessionId,
images: opts.images,
streamParams: opts.streamParams,
agentDir,
onAgentEvent: (evt) => {
// Track lifecycle end for fallback emission below.
if (
evt.stream === "lifecycle" &&
typeof evt.data?.phase === "string" &&
(evt.data.phase === "end" || evt.data.phase === "error")
) {
lifecycleEnded = true;
}
},
});
}
const authProfileId =
providerOverride === provider ? sessionEntry?.authProfileOverride : undefined;
return runEmbeddedPiAgent({
sessionId,
sessionKey,
messageChannel,
agentAccountId: runContext.accountId,
messageTo: opts.replyTo ?? opts.to,
messageThreadId: opts.threadId,
groupId: runContext.groupId,
groupChannel: runContext.groupChannel,
groupSpace: runContext.groupSpace,
spawnedBy,
currentChannelId: runContext.currentChannelId,
currentThreadTs: runContext.currentThreadTs,
replyToMode: runContext.replyToMode,
hasRepliedRef: runContext.hasRepliedRef,
sessionFile,
workspaceDir,
config: cfg,
skillsSnapshot,
prompt: body,
images: opts.images,
clientTools: opts.clientTools,
provider: providerOverride,
model: modelOverride,
authProfileId,
authProfileIdSource: authProfileId
? sessionEntry?.authProfileOverrideSource
: undefined,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
timeoutMs,
runId,
lane: opts.lane,
abortSignal: opts.abortSignal,
extraSystemPrompt: opts.extraSystemPrompt,
streamParams: opts.streamParams,
agentDir,
onAgentEvent: (evt) => {
// Track lifecycle end for fallback emission below.
if (
evt.stream === "lifecycle" &&
typeof evt.data?.phase === "string" &&
(evt.data.phase === "end" || evt.data.phase === "error")
) {
lifecycleEnded = true;
}
},
});
},
});
result = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
},
});
result = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
}
if (!lifecycleEnded) {
emitAgentEvent({
runId,

View File

@ -72,6 +72,8 @@ export type AgentCommandOpts = {
lane?: string;
runId?: string;
extraSystemPrompt?: string;
orchestrator?: boolean;
noOrchestrator?: boolean;
/** Per-call stream param overrides (best-effort). */
streamParams?: AgentStreamParams;
};

View File

@ -0,0 +1,52 @@
import { confirm, intro, outro, text } from "./configure.shared.js";
import { loadConfig, writeConfigFile } from "../config/io.js";
import type { RuntimeEnv } from "../runtime.js";
import { ORCHESTRATOR_DEFAULT_MODEL } from "../agents/orchestrator/config.js";
export async function runOrchestratorWizard(_runtime: RuntimeEnv) {
intro("Orchestrator Configuration");
const cfg = loadConfig();
const orchestrator = cfg.orchestrator ?? {};
const enabled = await confirm({
message: "Enable smart orchestrator? (Intercepts messages and routes to best agent)",
initialValue: orchestrator.enabled ?? false,
});
if (typeof enabled === "symbol") return;
const model = await text({
message: "Orchestrator Model",
placeholder: ORCHESTRATOR_DEFAULT_MODEL,
initialValue: orchestrator.model ?? ORCHESTRATOR_DEFAULT_MODEL,
});
if (typeof model === "symbol") return;
const opencodeEnabled = await confirm({
message: "Enable OpenCode Agent delegation?",
initialValue: orchestrator.agents?.opencode?.enabled ?? true,
});
if (typeof opencodeEnabled === "symbol") return;
const nextCfg = {
...cfg,
orchestrator: {
...orchestrator,
enabled,
model,
agents: {
...orchestrator.agents,
opencode: {
...orchestrator.agents?.opencode,
enabled: opencodeEnabled,
},
},
},
};
await writeConfigFile(nextCfg);
outro("Orchestrator configuration updated.");
}

View File

@ -16,6 +16,7 @@ export const CONFIGURE_WIZARD_SECTIONS = [
"daemon",
"channels",
"skills",
"orchestrator",
"health",
] as const;
@ -48,6 +49,7 @@ export const CONFIGURE_SECTION_OPTIONS: Array<{
hint: "Link WhatsApp/Telegram/etc and defaults",
},
{ value: "skills", label: "Skills", hint: "Install/enable workspace skills" },
{ value: "orchestrator", label: "Orchestrator", hint: "Configure smart routing + OpenCode" },
{
value: "health",
label: "Health check",

View File

@ -13,6 +13,7 @@ import { removeChannelConfigWizard } from "./configure.channels.js";
import { maybeInstallDaemon } from "./configure.daemon.js";
import { promptGatewayConfig } from "./configure.gateway.js";
import { promptAuthConfig } from "./configure.gateway-auth.js";
import { runOrchestratorWizard } from "./configure.orchestrator.js";
import type {
ChannelsWizardMode,
ConfigureWizardParams,
@ -349,7 +350,13 @@ export async function runConfigureWizard(
nextConfig = await setupSkills(nextConfig, wsDir, runtime, prompter);
}
await persistConfig();
if (selected.includes("orchestrator")) {
await runOrchestratorWizard(runtime);
const snapshot = await readConfigFileSnapshot();
if (snapshot.exists) {
nextConfig = snapshot.config;
}
}
if (selected.includes("daemon")) {
if (!selected.includes("gateway")) {

View File

@ -19,6 +19,7 @@ import type {
MessagesConfig,
} from "./types.messages.js";
import type { ModelsConfig } from "./types.models.js";
import type { OrchestratorConfig } from "./types.orchestrator.js";
import type { NodeHostConfig } from "./types.node-host.js";
import type { PluginsConfig } from "./types.plugins.js";
import type { SkillsConfig } from "./types.skills.js";
@ -77,6 +78,7 @@ export type MoltbotConfig = {
skills?: SkillsConfig;
plugins?: PluginsConfig;
models?: ModelsConfig;
orchestrator?: OrchestratorConfig;
nodeHost?: NodeHostConfig;
agents?: AgentsConfig;
tools?: ToolsConfig;

View File

@ -0,0 +1,31 @@
export interface OpenCodeAgentConfig {
enabled?: boolean;
model?: string;
mode?: "cli" | "serve";
binary?: string;
timeoutMs?: number;
servePort?: number;
}
export interface ResearchAgentConfig {
enabled?: boolean;
model?: string;
}
export interface EmbeddedAgentConfig {
enabled?: boolean;
model?: string | null;
}
export interface OrchestratorConfig {
enabled?: boolean;
model?: string;
fallbacks?: string[];
agents?: {
opencode?: OpenCodeAgentConfig;
research?: ResearchAgentConfig;
embedded?: EmbeddedAgentConfig;
};
parallelExecution?: boolean;
maxParallelAgents?: number;
}

View File

@ -16,6 +16,7 @@ export * from "./types.hooks.js";
export * from "./types.imessage.js";
export * from "./types.messages.js";
export * from "./types.models.js";
export * from "./types.orchestrator.js";
export * from "./types.node-host.js";
export * from "./types.msteams.js";
export * from "./types.plugins.js";

View File

@ -0,0 +1,48 @@
import { z } from "zod";
export const OpenCodeAgentSchema = z
.object({
enabled: z.boolean().optional(),
model: z.string().optional(),
mode: z.union([z.literal("cli"), z.literal("serve")]).optional(),
binary: z.string().optional(),
timeoutMs: z.number().int().nonnegative().optional(),
servePort: z.number().int().min(1).max(65535).optional(),
})
.strict()
.optional();
export const ResearchAgentSchema = z
.object({
enabled: z.boolean().optional(),
model: z.string().optional(),
})
.strict()
.optional();
export const EmbeddedAgentSchema = z
.object({
enabled: z.boolean().optional(),
model: z.string().nullable().optional(),
})
.strict()
.optional();
export const OrchestratorSchema = z
.object({
enabled: z.boolean().optional(),
model: z.string().optional(),
fallbacks: z.array(z.string()).optional(),
agents: z
.object({
opencode: OpenCodeAgentSchema,
research: ResearchAgentSchema,
embedded: EmbeddedAgentSchema,
})
.strict()
.optional(),
parallelExecution: z.boolean().optional(),
maxParallelAgents: z.number().int().nonnegative().optional(),
})
.strict()
.optional();

View File

@ -5,6 +5,7 @@ import { AgentsSchema, AudioSchema, BindingsSchema, BroadcastSchema } from "./zo
import { HexColorSchema, ModelsConfigSchema } from "./zod-schema.core.js";
import { HookMappingSchema, HooksGmailSchema, InternalHooksSchema } from "./zod-schema.hooks.js";
import { ChannelsSchema } from "./zod-schema.providers.js";
import { OrchestratorSchema } from "./zod-schema.orchestrator.js";
import { CommandsSchema, MessagesSchema, SessionSchema } from "./zod-schema.session.js";
const BrowserSnapshotDefaultsSchema = z
@ -207,6 +208,7 @@ export const MoltbotSchema = z
.strict()
.optional(),
models: ModelsConfigSchema,
orchestrator: OrchestratorSchema,
nodeHost: NodeHostSchema,
agents: AgentsSchema,
tools: ToolsSchema,

View File

@ -19,6 +19,10 @@ import {
resolveHooksGmailModel,
resolveThinkingDefault,
} from "../../agents/model-selection.js";
import {
orchestrateUserMessage,
resolveOrchestratorConfig,
} from "../../agents/orchestrator/index.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
@ -317,6 +321,10 @@ export async function runCronIsolatedAgentTurn(params: {
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = provider;
let fallbackModel = model;
const orchestratorConfig = resolveOrchestratorConfig(cfgWithAgentDefaults);
const useOrchestrator = orchestratorConfig.enabled;
try {
const sessionFile = resolveSessionTranscriptPath(cronSession.sessionEntry.sessionId, agentId);
const resolvedVerboseLevel =
@ -328,53 +336,76 @@ export async function runCronIsolatedAgentTurn(params: {
verboseLevel: resolvedVerboseLevel,
});
const messageChannel = resolvedDelivery.channel;
const fallbackResult = await runWithModelFallback({
cfg: cfgWithAgentDefaults,
provider,
model,
agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
run: (providerOverride, modelOverride) => {
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
return runCliAgent({
if (useOrchestrator) {
const orchResult = await orchestrateUserMessage({
userMessage: commandBody,
sessionId: cronSession.sessionEntry.sessionId,
sessionKey: agentSessionKey,
config: cfgWithAgentDefaults,
timeoutMs,
thinking: thinkLevel,
});
runResult = {
payloads: orchResult.payloads,
meta: {
durationMs: orchResult.meta.durationMs,
agentMeta: {
sessionId: cronSession.sessionEntry.sessionId,
provider: "orchestrator",
model: orchResult.meta.orchestratorModel,
},
},
};
} else {
const fallbackResult = await runWithModelFallback({
cfg: cfgWithAgentDefaults,
provider,
model,
agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
run: (providerOverride, modelOverride) => {
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
return runCliAgent({
sessionId: cronSession.sessionEntry.sessionId,
sessionKey: agentSessionKey,
sessionFile,
workspaceDir,
config: cfgWithAgentDefaults,
prompt: commandBody,
provider: providerOverride,
model: modelOverride,
thinkLevel: thinkLevel,
timeoutMs: timeoutMs,
runId: cronSession.sessionEntry.sessionId,
cliSessionId,
});
}
return runEmbeddedPiAgent({
sessionId: cronSession.sessionEntry.sessionId,
sessionKey: agentSessionKey,
messageChannel,
agentAccountId: resolvedDelivery.accountId,
sessionFile,
workspaceDir,
config: cfgWithAgentDefaults,
skillsSnapshot,
prompt: commandBody,
lane: params.lane ?? "cron",
provider: providerOverride,
model: modelOverride,
thinkLevel,
thinkLevel: thinkLevel,
verboseLevel: resolvedVerboseLevel,
timeoutMs,
runId: cronSession.sessionEntry.sessionId,
cliSessionId,
});
}
return runEmbeddedPiAgent({
sessionId: cronSession.sessionEntry.sessionId,
sessionKey: agentSessionKey,
messageChannel,
agentAccountId: resolvedDelivery.accountId,
sessionFile,
workspaceDir,
config: cfgWithAgentDefaults,
skillsSnapshot,
prompt: commandBody,
lane: params.lane ?? "cron",
provider: providerOverride,
model: modelOverride,
thinkLevel,
verboseLevel: resolvedVerboseLevel,
timeoutMs,
runId: cronSession.sessionEntry.sessionId,
});
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
},
});
runResult = fallbackResult.result;
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
}
} catch (err) {
return { status: "error", error: String(err) };
}

View File

@ -68,6 +68,8 @@ export const AgentParamsSchema = Type.Object(
idempotencyKey: NonEmptyString,
label: Type.Optional(SessionLabelString),
spawnedBy: Type.Optional(Type.String()),
orchestrator: Type.Optional(Type.Boolean()),
noOrchestrator: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
);

View File

@ -85,6 +85,8 @@ export const agentHandlers: GatewayRequestHandlers = {
timeout?: number;
label?: string;
spawnedBy?: string;
orchestrator?: boolean;
noOrchestrator?: boolean;
};
const cfg = loadConfig();
const idem = request.idempotencyKey;
@ -377,6 +379,8 @@ export const agentHandlers: GatewayRequestHandlers = {
runId,
lane: request.lane,
extraSystemPrompt: request.extraSystemPrompt,
orchestrator: request.orchestrator,
noOrchestrator: request.noOrchestrator,
},
defaultRuntime,
context.deps,