fix(session_compact): use direct compaction when called from active run

- Add compactEmbeddedPiSessionDirect for synchronous compaction
- Detect active run and use direct method to avoid self-abort deadlock
- Fixes transcript repair error when session_compact called mid-session
This commit is contained in:
sasheenmusic 2026-01-29 14:31:56 -08:00
parent 30262f9ffc
commit ec8ddeb5ac
13 changed files with 811 additions and 25 deletions

40
LOCAL_STATE.md Normal file
View File

@ -0,0 +1,40 @@
# Local state + config export (dev-only)
Moltbot stores runtime state under your home directory (by default `~/.moltbot`, with legacy `~/.clawdbot` often pointing to the same place).
This repo intentionally does **not** track your real local config, pairing stores, tokens, or other secrets. Instead, it provides a script that copies local state into a gitignored folder and optionally writes a **redacted** snapshot that is safe to commit.
## Export local state into this repo
From the repo root:
```bash
node scripts/local/export-local-state.mjs
```
Outputs:
- `.local/moltbot/state/` (gitignored): a local backup of your state/config files
- `config/redacted/moltbot.redacted.json` (tracked): a redacted snapshot for reference/review
### Optional flags
```bash
node scripts/local/export-local-state.mjs --include-agents --include-memory --include-logs
```
Those folders can be large.
## Security notes
- The export script intentionally skips OAuth credential files like `oauth.json`.
- Always review `config/redacted/moltbot.redacted.json` before committing.
- Never commit real tokens, secrets, phone numbers, or personal identifiers.
## Optional: import a local notes folder
If you keep local operator notes in a folder like `~/clawd/`, you can copy it into this repo under `.local/`:
```bash
node scripts/local/import-clawd.mjs
```

1
config/redacted/.gitkeep Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1,197 @@
{
"meta": {
"lastTouchedVersion": "2026.1.27-beta.1",
"lastTouchedAt": "2026-01-29T18:27:46.225Z"
},
"wizard": {
"lastRunAt": "2026-01-28T03:25:58.516Z",
"lastRunVersion": "2026.1.24-3",
"lastRunCommand": "configure",
"lastRunMode": "local"
},
"browser": {
"enabled": true,
"remoteCdpTimeoutMs": 0,
"remoteCdpHandshakeTimeoutMs": 60000
},
"auth": {
"profiles": {
"openai-codex:codex-cli": {
"provider": "openai-codex",
"mode": "oauth"
},
"anthropic:claude-cli": {
"provider": "anthropic",
"mode": "oauth"
},
"openai:manual": {
"provider": "openai",
"mode": "token"
},
"zai:default": {
"provider": "zai",
"mode": "api_key"
},
"anthropic:default": {
"provider": "anthropic",
"mode": "token"
}
},
"order": {
"anthropic": [
"<redacted>"
]
}
},
"agents": {
"defaults": {
"model": {
"primary": "anthropic/claude-opus-4-5",
"fallbacks": [
"<redacted>"
]
},
"models": {
"anthropic/claude-opus-4-5": {
"alias": "opus"
}
},
"workspace": "/Users/conradsasinski/clawd",
"memorySearch": {
"sources": [
"<redacted>"
],
"experimental": {
"sessionMemory": true
},
"provider": "openai",
"fallback": "openai",
"model": "text-embedding-3-small",
"sync": {
"watch": true
}
},
"compaction": {
"memoryFlush": {
"enabled": true
}
},
"thinkingDefault": "medium",
"elevatedDefault": "full",
"maxConcurrent": 4,
"subagents": {
"maxConcurrent": 8
},
"sandbox": {
"mode": "off"
}
},
"list": [
"<redacted>"
]
},
"tools": {
"allow": [
"<redacted>"
],
"web": {
"search": {
"enabled": true,
"apiKey": "<redacted>"
},
"fetch": {
"enabled": true
}
},
"agentToAgent": {
"enabled": true
},
"elevated": {
"enabled": true,
"allowFrom": []
},
"exec": {
"host": "gateway",
"security": "full",
"ask": "off"
}
},
"messages": {
"inbound": {
"byChannel": {
"telegram": 2000
}
},
"ackReactionScope": "group-mentions"
},
"commands": {
"native": "auto",
"nativeSkills": "auto",
"restart": true
},
"hooks": {
"internal": {
"enabled": true,
"entries": {
"session-memory": {
"enabled": true
}
}
}
},
"channels": {
"telegram": {
"enabled": true,
"dmPolicy": "pairing",
"botToken": "<redacted>",
"replyToMode": "off",
"groupPolicy": "allowlist",
"streamMode": "off"
}
},
"talk": {
"apiKey": "<redacted>"
},
"gateway": {
"port": 18789,
"mode": "local",
"bind": "loopback",
"auth": {
"mode": "token",
"token": "<redacted>"
},
"tailscale": {
"mode": "off",
"resetOnExit": false
}
},
"skills": {
"load": {
"watch": true,
"watchDebounceMs": 500
},
"install": {
"nodeManager": "npm"
},
"entries": {
"sag": {
"apiKey": "<redacted>"
},
"cronometer-logger": {
"enabled": true,
"config": {
"mode": "day-only",
"diaryGroup": "disabled",
"defaultDate": "today"
}
}
}
},
"plugins": {
"entries": {
"telegram": {
"enabled": true
}
}
}
}

View File

@ -144,7 +144,10 @@
"protocol:gen:swift": "node --import tsx scripts/protocol-gen-swift.ts", "protocol:gen:swift": "node --import tsx scripts/protocol-gen-swift.ts",
"protocol:check": "pnpm protocol:gen && pnpm protocol:gen:swift && git diff --exit-code -- dist/protocol.schema.json apps/macos/Sources/MoltbotProtocol/GatewayModels.swift", "protocol:check": "pnpm protocol:gen && pnpm protocol:gen:swift && git diff --exit-code -- dist/protocol.schema.json apps/macos/Sources/MoltbotProtocol/GatewayModels.swift",
"canvas:a2ui:bundle": "bash scripts/bundle-a2ui.sh", "canvas:a2ui:bundle": "bash scripts/bundle-a2ui.sh",
"check:loc": "node --import tsx scripts/check-ts-max-loc.ts --max 500" "check:loc": "node --import tsx scripts/check-ts-max-loc.ts --max 500",
"local:export-state": "node scripts/local/export-local-state.mjs",
"local:export-state:full": "node scripts/local/export-local-state.mjs --include-agents --include-memory --include-logs",
"local:import-clawd": "node scripts/local/import-clawd.mjs"
}, },
"keywords": [], "keywords": [],
"author": "", "author": "",

View File

@ -0,0 +1,285 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
function parseArgs(argv) {
const args = {
source: null,
out: null,
redactOut: null,
includeAgents: false,
includeLogs: false,
includeMemory: false,
};
for (let i = 0; i < argv.length; i += 1) {
const cur = argv[i];
if (cur === "--source") args.source = argv[++i] ?? null;
else if (cur === "--out") args.out = argv[++i] ?? null;
else if (cur === "--redact-out") args.redactOut = argv[++i] ?? null;
else if (cur === "--include-agents") args.includeAgents = true;
else if (cur === "--include-logs") args.includeLogs = true;
else if (cur === "--include-memory") args.includeMemory = true;
else if (cur === "--help" || cur === "-h") {
args.help = true;
} else if (cur?.startsWith("--")) {
throw new Error(`Unknown flag: ${cur}`);
}
}
return args;
}
function expandUserPath(input) {
const trimmed = String(input ?? "").trim();
if (!trimmed) return trimmed;
if (trimmed.startsWith("~")) {
return path.resolve(trimmed.replace(/^~(?=$|[\\/])/, os.homedir()));
}
return path.resolve(trimmed);
}
async function pathExists(p) {
try {
await fs.access(p);
return true;
} catch {
return false;
}
}
async function ensureDir(dir) {
await fs.mkdir(dir, { recursive: true });
}
async function copyFileIfExists(from, to) {
if (!(await pathExists(from))) return false;
await ensureDir(path.dirname(to));
await fs.copyFile(from, to);
return true;
}
async function copyDirIfExists(fromDir, toDir, { filter } = {}) {
if (!(await pathExists(fromDir))) return { copied: 0, skipped: 0 };
await ensureDir(toDir);
let copied = 0;
let skipped = 0;
const entries = await fs.readdir(fromDir, { withFileTypes: true });
for (const entry of entries) {
const src = path.join(fromDir, entry.name);
const dst = path.join(toDir, entry.name);
if (filter && !filter({ name: entry.name, src, isDir: entry.isDirectory() })) {
skipped += 1;
continue;
}
if (entry.isDirectory()) {
const res = await copyDirIfExists(src, dst, { filter });
copied += res.copied;
skipped += res.skipped;
} else if (entry.isFile()) {
await ensureDir(path.dirname(dst));
await fs.copyFile(src, dst);
copied += 1;
} else {
skipped += 1;
}
}
return { copied, skipped };
}
function shouldRedactKey(key) {
const k = String(key).toLowerCase();
return (
k.includes("token") ||
k.includes("secret") ||
k.includes("password") ||
k.includes("apikey") ||
k.includes("api_key") ||
k.endsWith("key")
);
}
function redactValue(value) {
if (typeof value === "string") {
const trimmed = value.trim();
// Telegram bot token format: <digits>:<base64url-ish>
if (/^\d+:[A-Za-z0-9_-]{20,}$/.test(trimmed)) return "<redacted>";
// Discord bot token-ish (very loose) or other opaque tokens
if (trimmed.length >= 24 && /^[A-Za-z0-9._-]+$/.test(trimmed)) return "<redacted>";
// Local absolute paths are usually personal; keep only basename.
if (path.isAbsolute(trimmed) || trimmed.startsWith("~/")) {
return `<path:${path.basename(trimmed)}>`;
}
}
return "<redacted>";
}
function redactObject(obj) {
if (Array.isArray(obj)) {
// Lists often contain ids/handles; keep shape but hide contents.
return obj.length > 0 ? ["<redacted>"] : [];
}
if (!obj || typeof obj !== "object") return obj;
const out = {};
for (const [key, value] of Object.entries(obj)) {
if (shouldRedactKey(key)) {
out[key] = redactValue(value);
continue;
}
// Known id-heavy fields: keep presence but hide.
if (key === "allowFrom" || key === "groupAllowFrom") {
out[key] = Array.isArray(value) && value.length > 0 ? ["<redacted>"] : [];
continue;
}
if (key === "groups" && value && typeof value === "object" && !Array.isArray(value)) {
const v = value;
const keep = {};
if (Object.prototype.hasOwnProperty.call(v, "*")) keep["*"] = v["*"];
const exampleKey = Object.keys(v).find((k) => k !== "*");
if (exampleKey) keep["<redacted>"] = v[exampleKey];
out[key] = keep;
continue;
}
out[key] = redactObject(value);
}
return out;
}
async function readJsonFile(filePath) {
const raw = await fs.readFile(filePath, "utf-8");
try {
return JSON.parse(raw);
} catch {
const json5 = await import("json5");
return json5.default.parse(raw);
}
}
async function writeJsonFile(filePath, value) {
await ensureDir(path.dirname(filePath));
await fs.writeFile(filePath, `${JSON.stringify(value, null, 2)}\n`, "utf-8");
}
function formatList(items) {
return items.length ? items.map((v) => `- ${v}`).join("\n") : "- (none)";
}
async function main() {
const args = parseArgs(process.argv.slice(2));
if (args.help) {
process.stdout.write(
[
"Usage: node scripts/local/export-local-state.mjs [flags]",
"",
"Flags:",
" --source <dir> Source state dir (default: ~/.moltbot)",
" --out <dir> Output dir (default: ./.local/moltbot/state)",
" --redact-out <dir> Write redacted snapshots here (default: ./config/redacted)",
" --include-agents Copy ~/.moltbot/agents (can be large)",
" --include-memory Copy ~/.moltbot/memory (can be large)",
" --include-logs Copy ~/.moltbot/logs (can be large)",
"",
].join("\n"),
);
return;
}
const source = expandUserPath(args.source ?? "~/.moltbot");
const outDir = expandUserPath(args.out ?? path.join(process.cwd(), ".local", "moltbot", "state"));
const redactOutDir = expandUserPath(
args.redactOut ?? path.join(process.cwd(), "config", "redacted"),
);
if (!(await pathExists(source))) {
throw new Error(`Source state dir not found: ${source}`);
}
await ensureDir(outDir);
await ensureDir(redactOutDir);
const copied = [];
const missing = [];
const configPath = path.join(source, "moltbot.json");
if (await copyFileIfExists(configPath, path.join(outDir, "moltbot.json"))) copied.push("moltbot.json");
else missing.push("moltbot.json");
// Backups (helpful for diffing/migrations)
const stateEntries = await fs.readdir(source, { withFileTypes: true });
for (const entry of stateEntries) {
if (!entry.isFile()) continue;
if (!/^moltbot\.json\.bak(\.|$)/.test(entry.name) && !/^clawdbot\.json\.bak(\.|$)/.test(entry.name)) {
continue;
}
await copyFileIfExists(path.join(source, entry.name), path.join(outDir, entry.name));
copied.push(entry.name);
}
// Credentials: copy pairing + allowFrom stores only (avoid oauth.json).
const credsSrc = path.join(source, "credentials");
await copyDirIfExists(credsSrc, path.join(outDir, "credentials"), {
filter: ({ name, isDir }) => {
if (isDir) return true;
return name.endsWith("-allowFrom.json") || name.endsWith("-pairing.json");
},
});
if (await pathExists(credsSrc)) copied.push("credentials/*(-allowFrom|-pairing).json");
// Telegram update offsets
const tgSrc = path.join(source, "telegram");
await copyDirIfExists(tgSrc, path.join(outDir, "telegram"), {
filter: ({ name, isDir }) => isDir || name.startsWith("update-offset-"),
});
if (await pathExists(tgSrc)) copied.push("telegram/update-offset-*.json");
// Optional large dirs
if (args.includeAgents) {
await copyDirIfExists(path.join(source, "agents"), path.join(outDir, "agents"));
copied.push("agents/**");
}
if (args.includeMemory) {
await copyDirIfExists(path.join(source, "memory"), path.join(outDir, "memory"));
copied.push("memory/**");
}
if (args.includeLogs) {
await copyDirIfExists(path.join(source, "logs"), path.join(outDir, "logs"));
copied.push("logs/**");
}
// Redacted snapshot of config for version control.
if (await pathExists(configPath)) {
const cfg = await readJsonFile(configPath);
const redacted = redactObject(cfg);
await writeJsonFile(path.join(redactOutDir, "moltbot.redacted.json"), redacted);
}
process.stdout.write(
[
"Export complete.",
"",
`Source: ${source}`,
`Out: ${outDir}`,
`Redact: ${redactOutDir}`,
"",
"Copied:",
formatList(copied),
"",
"Missing:",
formatList(missing),
"",
"Notes:",
"- Out dir is under .local/ and is gitignored by default.",
"- Redacted config snapshot is safe to commit; validate before pushing.",
"",
].join("\n"),
);
}
await main();

View File

@ -0,0 +1,141 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
function parseArgs(argv) {
const args = {
source: null,
out: null,
includeMemory: true,
};
for (let i = 0; i < argv.length; i += 1) {
const cur = argv[i];
if (cur === "--source") args.source = argv[++i] ?? null;
else if (cur === "--out") args.out = argv[++i] ?? null;
else if (cur === "--no-memory") args.includeMemory = false;
else if (cur === "--help" || cur === "-h") args.help = true;
else if (cur?.startsWith("--")) throw new Error(`Unknown flag: ${cur}`);
}
return args;
}
function expandUserPath(input) {
const trimmed = String(input ?? "").trim();
if (!trimmed) return trimmed;
if (trimmed.startsWith("~")) {
return path.resolve(trimmed.replace(/^~(?=$|[\\/])/, os.homedir()));
}
return path.resolve(trimmed);
}
async function pathExists(p) {
try {
await fs.access(p);
return true;
} catch {
return false;
}
}
async function ensureDir(dir) {
await fs.mkdir(dir, { recursive: true });
}
async function copyFileIfExists(from, to) {
if (!(await pathExists(from))) return false;
await ensureDir(path.dirname(to));
await fs.copyFile(from, to);
return true;
}
async function copyDir(fromDir, toDir) {
await ensureDir(toDir);
const entries = await fs.readdir(fromDir, { withFileTypes: true });
for (const entry of entries) {
const src = path.join(fromDir, entry.name);
const dst = path.join(toDir, entry.name);
if (entry.isDirectory()) {
await copyDir(src, dst);
} else if (entry.isFile()) {
await ensureDir(path.dirname(dst));
await fs.copyFile(src, dst);
}
}
}
function formatList(items) {
return items.length ? items.map((v) => `- ${v}`).join("\n") : "- (none)";
}
async function main() {
const args = parseArgs(process.argv.slice(2));
if (args.help) {
process.stdout.write(
[
"Usage: node scripts/local/import-clawd.mjs [flags]",
"",
"Flags:",
" --source <dir> Source folder (default: ~/clawd)",
" --out <dir> Output folder (default: ./.local/clawd)",
" --no-memory Do not copy memory/ (can be large)",
"",
].join("\n"),
);
return;
}
const source = expandUserPath(args.source ?? "~/clawd");
const outDir = expandUserPath(args.out ?? path.join(process.cwd(), ".local", "clawd"));
if (!(await pathExists(source))) {
throw new Error(`Source folder not found: ${source}`);
}
const copied = [];
const missing = [];
const topFiles = [
"AGENTS.md",
"HEARTBEAT.md",
"SOUL.md",
"TOOLS.md",
"USER.md",
"IDENTITY.md",
"MEMORY.md",
];
for (const name of topFiles) {
const ok = await copyFileIfExists(path.join(source, name), path.join(outDir, name));
(ok ? copied : missing).push(name);
}
if (args.includeMemory) {
const mem = path.join(source, "memory");
if (await pathExists(mem)) {
await copyDir(mem, path.join(outDir, "memory"));
copied.push("memory/**");
} else {
missing.push("memory/**");
}
}
process.stdout.write(
[
"Import complete.",
"",
`Source: ${source}`,
`Out: ${outDir}`,
"",
"Copied:",
formatList(copied),
"",
"Missing:",
formatList(missing),
"",
"Notes:",
"- Out dir is under .local/ and is gitignored by default.",
"",
].join("\n"),
);
}
await main();

View File

@ -1,5 +1,8 @@
export type { MessagingToolSend } from "./pi-embedded-messaging.js"; export type { MessagingToolSend } from "./pi-embedded-messaging.js";
export { compactEmbeddedPiSession } from "./pi-embedded-runner/compact.js"; export {
compactEmbeddedPiSession,
compactEmbeddedPiSessionDirect,
} from "./pi-embedded-runner/compact.js";
export { applyExtraParamsToAgent, resolveExtraParams } from "./pi-embedded-runner/extra-params.js"; export { applyExtraParamsToAgent, resolveExtraParams } from "./pi-embedded-runner/extra-params.js";
export { applyGoogleTurnOrderingFix } from "./pi-embedded-runner/google.js"; export { applyGoogleTurnOrderingFix } from "./pi-embedded-runner/google.js";

View File

@ -7,6 +7,7 @@ export type {
export { export {
abortEmbeddedPiRun, abortEmbeddedPiRun,
compactEmbeddedPiSession, compactEmbeddedPiSession,
compactEmbeddedPiSessionDirect,
isEmbeddedPiRunActive, isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming, isEmbeddedPiRunStreaming,
queueEmbeddedPiMessage, queueEmbeddedPiMessage,

View File

@ -3,6 +3,9 @@ import type { SessionManager } from "@mariozechner/pi-coding-agent";
import { makeMissingToolResult } from "./session-transcript-repair.js"; import { makeMissingToolResult } from "./session-transcript-repair.js";
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const log = createSubsystemLogger("session-tool-result-guard");
type ToolCall = { id: string; name?: string }; type ToolCall = { id: string; name?: string };
@ -69,8 +72,16 @@ export function installSessionToolResultGuard(
const flushPendingToolResults = () => { const flushPendingToolResults = () => {
if (pending.size === 0) return; if (pending.size === 0) return;
log.warn(
`flushPendingToolResults called with ${pending.size} pending tool calls: ${Array.from(
pending.entries(),
)
.map(([id, name]) => `${name ?? "unknown"}(${id})`)
.join(", ")}`,
);
if (allowSyntheticToolResults) { if (allowSyntheticToolResults) {
for (const [id, name] of pending.entries()) { for (const [id, name] of pending.entries()) {
log.warn(`Creating synthetic error result for tool call: ${name ?? "unknown"}(${id})`);
const synthetic = makeMissingToolResult({ toolCallId: id, toolName: name }); const synthetic = makeMissingToolResult({ toolCallId: id, toolName: name });
originalAppend( originalAppend(
persistToolResult(synthetic, { persistToolResult(synthetic, {
@ -90,7 +101,11 @@ export function installSessionToolResultGuard(
if (role === "toolResult") { if (role === "toolResult") {
const id = extractToolResultId(message as Extract<AgentMessage, { role: "toolResult" }>); const id = extractToolResultId(message as Extract<AgentMessage, { role: "toolResult" }>);
const toolName = id ? pending.get(id) : undefined; const toolName = id ? pending.get(id) : undefined;
const wasPending = id ? pending.has(id) : false;
if (id) pending.delete(id); if (id) pending.delete(id);
log.debug(
`Tool result received: ${toolName ?? "unknown"}(${id}) - wasPending=${wasPending}, remainingPending=${pending.size}`,
);
return originalAppend( return originalAppend(
persistToolResult(message, { persistToolResult(message, {
toolCallId: id ?? undefined, toolCallId: id ?? undefined,
@ -128,6 +143,9 @@ export function installSessionToolResultGuard(
if (toolCalls.length > 0) { if (toolCalls.length > 0) {
for (const call of toolCalls) { for (const call of toolCalls) {
pending.set(call.id, call.name); pending.set(call.id, call.name);
log.debug(
`Tool call added to pending: ${call.name ?? "unknown"}(${call.id}) - totalPending=${pending.size}`,
);
} }
} }

View File

@ -2,10 +2,9 @@ import { Type } from "@sinclair/typebox";
import * as fs from "node:fs"; import * as fs from "node:fs";
import * as path from "node:path"; import * as path from "node:path";
import { import {
abortEmbeddedPiRun,
compactEmbeddedPiSession, compactEmbeddedPiSession,
compactEmbeddedPiSessionDirect,
isEmbeddedPiRunActive, isEmbeddedPiRunActive,
waitForEmbeddedPiRunEnd,
} from "../../agents/pi-embedded.js"; } from "../../agents/pi-embedded.js";
import { resolveAgentDir } from "../../agents/agent-scope.js"; import { resolveAgentDir } from "../../agents/agent-scope.js";
import { loadConfig } from "../../config/config.js"; import { loadConfig } from "../../config/config.js";
@ -170,16 +169,16 @@ export function createSessionCompactTool(opts?: SessionCompactToolOpts): AnyAgen
const sessionId = entry.sessionId; const sessionId = entry.sessionId;
// Abort any active run before compacting // If called from within an active run, use direct compaction to avoid
if (isEmbeddedPiRunActive(sessionId)) { // aborting ourselves (which would prevent the tool result from being saved).
abortEmbeddedPiRun(sessionId); // Otherwise, use queued compaction for external callers.
await waitForEmbeddedPiRunEnd(sessionId, 15_000); const runIsActive = isEmbeddedPiRunActive(sessionId);
}
const configured = resolveDefaultModelForAgent({ cfg, agentId }); const configured = resolveDefaultModelForAgent({ cfg, agentId });
const workspaceDir = opts?.workspaceDir ?? resolveAgentDir(cfg, agentId); const workspaceDir = opts?.workspaceDir ?? resolveAgentDir(cfg, agentId);
const result = await compactEmbeddedPiSession({ const compactFn = runIsActive ? compactEmbeddedPiSessionDirect : compactEmbeddedPiSession;
const result = await compactFn({
sessionId, sessionId,
sessionKey, sessionKey,
messageChannel: entry.lastChannel ?? entry.channel ?? "unknown", messageChannel: entry.lastChannel ?? entry.channel ?? "unknown",

View File

@ -12,6 +12,7 @@ type PluginToolMeta = {
}; };
const pluginToolMeta = new WeakMap<AnyAgentTool, PluginToolMeta>(); const pluginToolMeta = new WeakMap<AnyAgentTool, PluginToolMeta>();
const loggedConflicts = new Set<string>();
export function getPluginToolMeta(tool: AnyAgentTool): PluginToolMeta | undefined { export function getPluginToolMeta(tool: AnyAgentTool): PluginToolMeta | undefined {
return pluginToolMeta.get(tool); return pluginToolMeta.get(tool);
@ -61,7 +62,11 @@ export function resolvePluginTools(params: {
const pluginIdKey = normalizeToolName(entry.pluginId); const pluginIdKey = normalizeToolName(entry.pluginId);
if (existingNormalized.has(pluginIdKey)) { if (existingNormalized.has(pluginIdKey)) {
const message = `plugin id conflicts with core tool name (${entry.pluginId})`; const message = `plugin id conflicts with core tool name (${entry.pluginId})`;
const key = `plugin-id:${pluginIdKey}`;
if (!loggedConflicts.has(key)) {
loggedConflicts.add(key);
log.error(message); log.error(message);
}
registry.diagnostics.push({ registry.diagnostics.push({
level: "error", level: "error",
pluginId: entry.pluginId, pluginId: entry.pluginId,
@ -94,13 +99,17 @@ export function resolvePluginTools(params: {
for (const tool of list) { for (const tool of list) {
if (nameSet.has(tool.name) || existing.has(tool.name)) { if (nameSet.has(tool.name) || existing.has(tool.name)) {
const message = `plugin tool name conflict (${entry.pluginId}): ${tool.name}`; const message = `plugin tool name conflict (${entry.pluginId}): ${tool.name}`;
log.error(message); const key = `tool-name:${normalizeToolName(entry.pluginId)}:${normalizeToolName(tool.name)}`;
if (!loggedConflicts.has(key)) {
loggedConflicts.add(key);
log.warn(message);
registry.diagnostics.push({ registry.diagnostics.push({
level: "error", level: "warn",
pluginId: entry.pluginId, pluginId: entry.pluginId,
source: entry.source, source: entry.source,
message, message,
}); });
}
continue; continue;
} }
nameSet.add(tool.name); nameSet.add(tool.name);

View File

@ -19,12 +19,12 @@ import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionWithExplicit } from "../auto-reply/reply/mentions.js"; import { buildMentionRegexes, matchesMentionWithExplicit } from "../auto-reply/reply/mentions.js";
import { formatLocationText, toLocationContext } from "../channels/location.js"; import { formatLocationText, toLocationContext } from "../channels/location.js";
import { recordInboundSession } from "../channels/session.js"; import { recordInboundSession } from "../channels/session.js";
import { formatCliCommand } from "../cli/command-format.js";
import { readSessionUpdatedAt, resolveStorePath } from "../config/sessions.js"; import { readSessionUpdatedAt, resolveStorePath } from "../config/sessions.js";
import type { MoltbotConfig } from "../config/config.js"; import type { MoltbotConfig } from "../config/config.js";
import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
import { logVerbose, shouldLogVerbose } from "../globals.js"; import { logVerbose, shouldLogVerbose } from "../globals.js";
import { recordChannelActivity } from "../infra/channel-activity.js"; import { recordChannelActivity } from "../infra/channel-activity.js";
import { buildPairingReply } from "../pairing/pairing-messages.js";
import { resolveAgentRoute } from "../routing/resolve-route.js"; import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js"; import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js";
@ -81,6 +81,19 @@ type ResolveTelegramGroupConfig = (
messageThreadId?: number, messageThreadId?: number,
) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig }; ) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig };
const PAIRING_NUDGE_COOLDOWN_MS = 5 * 60 * 1000;
const pairingNudgeSentAtByChatId = new Map<string, number>();
function shouldSendPairingNudge(chatId: string, nowMs: number): boolean {
const lastMs = pairingNudgeSentAtByChatId.get(chatId);
if (lastMs == null) return true;
return nowMs - lastMs >= PAIRING_NUDGE_COOLDOWN_MS;
}
function recordPairingNudge(chatId: string, nowMs: number): void {
pairingNudgeSentAtByChatId.set(chatId, nowMs);
}
type ResolveGroupActivation = (params: { type ResolveGroupActivation = (params: {
chatId: string | number; chatId: string | number;
agentId?: string; agentId?: string;
@ -224,6 +237,8 @@ export const buildTelegramMessageContext = async ({
if (dmPolicy === "disabled") return null; if (dmPolicy === "disabled") return null;
if (dmPolicy !== "open") { if (dmPolicy !== "open") {
const isStartCommand = /^\/start(?:\s|$)/i.test((msg.text ?? msg.caption ?? "").trim());
const nowMs = Date.now();
const candidate = String(chatId); const candidate = String(chatId);
const senderUsername = msg.from?.username ?? ""; const senderUsername = msg.from?.username ?? "";
const allowMatch = resolveSenderAllowMatch({ const allowMatch = resolveSenderAllowMatch({
@ -254,7 +269,13 @@ export const buildTelegramMessageContext = async ({
firstName: from?.first_name, firstName: from?.first_name,
lastName: from?.last_name, lastName: from?.last_name,
}); });
if (created) { const shouldReply =
created || isStartCommand || (code ? shouldSendPairingNudge(candidate, nowMs) : true);
if (shouldReply) {
recordPairingNudge(candidate, nowMs);
}
if (code && shouldReply) {
logger.info( logger.info(
{ {
chatId: candidate, chatId: candidate,
@ -266,6 +287,23 @@ export const buildTelegramMessageContext = async ({
}, },
"telegram pairing request", "telegram pairing request",
); );
await withTelegramApiErrorLogging({
operation: "sendMessage",
fn: () =>
bot.api.sendMessage(
chatId,
[
buildPairingReply({
channel: "telegram",
idLine: `Your Telegram user id: ${telegramUserId}`,
code,
}),
"",
"Tip: send /start to show this again.",
].join("\n"),
),
});
} else if (!code && shouldReply) {
await withTelegramApiErrorLogging({ await withTelegramApiErrorLogging({
operation: "sendMessage", operation: "sendMessage",
fn: () => fn: () =>
@ -274,12 +312,10 @@ export const buildTelegramMessageContext = async ({
[ [
"Moltbot: access not configured.", "Moltbot: access not configured.",
"", "",
`Your Telegram user id: ${telegramUserId}`, "Pairing requests are temporarily rate-limited.",
"", "",
`Pairing code: ${code}`, "Ask the bot owner to run:",
"", "moltbot pairing list telegram",
"Ask the bot owner to approve with:",
formatCliCommand("moltbot pairing approve telegram <code>"),
].join("\n"), ].join("\n"),
), ),
}); });

View File

@ -634,6 +634,59 @@ describe("createTelegramBot", () => {
expect(sendMessageSpy).toHaveBeenCalledTimes(1); expect(sendMessageSpy).toHaveBeenCalledTimes(1);
}); });
it("resends pairing info after a cooldown so Telegram never appears silent", async () => {
onSpy.mockReset();
sendMessageSpy.mockReset();
const replySpy = replyModule.__replySpy as unknown as ReturnType<typeof vi.fn>;
replySpy.mockReset();
vi.useFakeTimers();
try {
const base = new Date("2025-01-09T00:00:00Z");
vi.setSystemTime(base);
loadConfig.mockReturnValue({
channels: { telegram: { dmPolicy: "pairing" } },
});
readTelegramAllowFromStore.mockResolvedValue([]);
upsertTelegramPairingRequest
.mockResolvedValueOnce({ code: "PAIRME12", created: true })
.mockResolvedValue({ code: "PAIRME12", created: false });
createTelegramBot({ token: "tok" });
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
const message = {
chat: { id: 1234, type: "private" },
text: "hello",
date: 1736380800,
from: { id: 999, username: "random" },
};
await handler({
message,
me: { username: "moltbot_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
await handler({
message: { ...message, text: "hello again" },
me: { username: "moltbot_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
vi.setSystemTime(new Date(base.getTime() + 6 * 60 * 1000));
await handler({
message: { ...message, text: "hello after cooldown" },
me: { username: "moltbot_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
expect(replySpy).not.toHaveBeenCalled();
expect(sendMessageSpy).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
});
it("triggers typing cue via onReplyStart", async () => { it("triggers typing cue via onReplyStart", async () => {
onSpy.mockReset(); onSpy.mockReset();
sendChatActionSpy.mockReset(); sendChatActionSpy.mockReset();