From 51a7d41c6bf1b4da7a8fa6e11b89e391c6943bde Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 21 Jan 2026 05:48:54 +0000 Subject: [PATCH] fix: enforce node invoke timeouts (#1357) (thanks @vignesh07) --- CHANGELOG.md | 1 + .../ClawdbotKit/GatewayNodeSession.swift | 69 +++++++++++++++---- .../GatewayNodeSessionTests.swift | 22 ++++++ src/cli/directory-cli.ts | 10 +++ src/cli/dns-cli.test.ts | 2 +- src/cli/help-format.ts | 10 ++- src/cli/nodes-cli/register.status.ts | 5 ++ src/cli/pairing-cli.test.ts | 8 ++- 8 files changed, 106 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5058a6ce3..4e4263806 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.clawd.bot - Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin. - CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx. - CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output. +- Nodes: enforce node.invoke timeouts for node handlers. (#1357) — thanks @vignesh07. - Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk. - Doctor: clarify plugin auto-enable hint text in the startup banner. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. diff --git a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift index 2cc26a51d..89ae86339 100644 --- a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift +++ b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift @@ -11,6 +11,35 @@ private struct NodeInvokeRequestPayload: Codable, Sendable { var idempotencyKey: String? } +// Ensures the timeout can win even if the invoke task never completes. +private actor InvokeTimeoutRace { + private var finished = false + private let continuation: CheckedContinuation + private var invokeTask: Task? + private var timeoutTask: Task? + + init(continuation: CheckedContinuation) { + self.continuation = continuation + } + + func registerTasks(invoke: Task, timeout: Task) { + self.invokeTask = invoke + self.timeoutTask = timeout + if finished { + invoke.cancel() + timeout.cancel() + } + } + + func finish(_ response: BridgeInvokeResponse) { + guard !finished else { return } + finished = true + continuation.resume(returning: response) + invokeTask?.cancel() + timeoutTask?.cancel() + } +} + public actor GatewayNodeSession { private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway") private let decoder = JSONDecoder() @@ -34,22 +63,32 @@ public actor GatewayNodeSession { return await onInvoke(request) } - return await withTaskGroup(of: BridgeInvokeResponse.self) { group in - group.addTask { await onInvoke(request) } - group.addTask { - try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000) - return BridgeInvokeResponse( - id: request.id, - ok: false, - error: ClawdbotNodeError( - code: .unavailable, - message: "node invoke timed out") - ) - } + let cappedTimeout = min(timeout, Int(UInt64.max / 1_000_000)) + let timeoutResponse = BridgeInvokeResponse( + id: request.id, + ok: false, + error: ClawdbotNodeError( + code: .unavailable, + message: "node invoke timed out") + ) - let first = await group.next()! - group.cancelAll() - return first + return await withCheckedContinuation { continuation in + let race = InvokeTimeoutRace(continuation: continuation) + let invokeTask = Task { + let response = await onInvoke(request) + await race.finish(response) + } + let timeoutTask = Task { + do { + try await Task.sleep(nanoseconds: UInt64(cappedTimeout) * 1_000_000) + } catch { + return + } + await race.finish(timeoutResponse) + } + Task { + await race.registerTasks(invoke: invokeTask, timeout: timeoutTask) + } } } private var serverEventSubscribers: [UUID: AsyncStream.Continuation] = [:] diff --git a/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift index 0fc688f63..4fb64ca82 100644 --- a/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift +++ b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayNodeSessionTests.swift @@ -38,6 +38,28 @@ struct GatewayNodeSessionTests { #expect(response.error?.message.contains("timed out") == true) } + @Test + func invokeWithTimeoutReturnsWhenHandlerNeverCompletes() async { + let request = BridgeInvokeRequest(id: "stall", command: "x", paramsJSON: nil) + let response = try? await AsyncTimeout.withTimeoutMs( + timeoutMs: 200, + onTimeout: { NSError(domain: "GatewayNodeSessionTests", code: 1) }, + operation: { + await GatewayNodeSession.invokeWithTimeout( + request: request, + timeoutMs: 10, + onInvoke: { _ in + await withCheckedContinuation { _ in } + } + ) + } + ) + + #expect(response != nil) + #expect(response?.ok == false) + #expect(response?.error?.code == .unavailable) + } + @Test func invokeWithTimeoutZeroDisablesTimeout() async { let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil) diff --git a/src/cli/directory-cli.ts b/src/cli/directory-cli.ts index 347695f63..8364c0804 100644 --- a/src/cli/directory-cli.ts +++ b/src/cli/directory-cli.ts @@ -9,6 +9,7 @@ import { defaultRuntime } from "../runtime.js"; import { formatDocsLink } from "../terminal/links.js"; import { theme } from "../terminal/theme.js"; import { renderTable } from "../terminal/table.js"; +import type { ChannelDirectoryEntry } from "../channels/plugins/types.core.js"; function parseLimit(value: unknown): number | null { if (typeof value === "number" && Number.isFinite(value)) { @@ -30,6 +31,15 @@ function buildRows(entries: Array<{ id: string; name?: string | undefined }>) { })); } +function formatEntry(entry: ChannelDirectoryEntry): string { + const name = entry.name?.trim(); + const handle = entry.handle?.trim(); + const handleLabel = handle ? (handle.startsWith("@") ? handle : `@${handle}`) : null; + const label = [name, handleLabel].filter(Boolean).join(" "); + if (!label) return entry.id; + return `${label} ${theme.muted(`(${entry.id})`)}`; +} + export function registerDirectoryCli(program: Command) { const directory = program .command("directory") diff --git a/src/cli/dns-cli.test.ts b/src/cli/dns-cli.test.ts index 6ad0938c6..0774b0d7b 100644 --- a/src/cli/dns-cli.test.ts +++ b/src/cli/dns-cli.test.ts @@ -7,6 +7,6 @@ describe("dns cli", () => { const log = vi.spyOn(console, "log").mockImplementation(() => {}); const program = buildProgram(); await program.parseAsync(["dns", "setup"], { from: "user" }); - expect(log).toHaveBeenCalledWith(expect.stringContaining("Domain:")); + expect(log).toHaveBeenCalledWith(expect.stringContaining("clawdbot.internal")); }); }); diff --git a/src/cli/help-format.ts b/src/cli/help-format.ts index a57ae8d29..77fe5842f 100644 --- a/src/cli/help-format.ts +++ b/src/cli/help-format.ts @@ -1,6 +1,6 @@ import { theme } from "../terminal/theme.js"; -export type HelpExample = [command: string, description: string]; +export type HelpExample = readonly [command: string, description: string]; export function formatHelpExample(command: string, description: string): string { return ` ${theme.command(command)}\n ${theme.muted(description)}`; @@ -11,11 +11,15 @@ export function formatHelpExampleLine(command: string, description: string): str return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`; } -export function formatHelpExamples(examples: HelpExample[], inline = false): string { +export function formatHelpExamples(examples: readonly HelpExample[], inline = false): string { const formatter = inline ? formatHelpExampleLine : formatHelpExample; return examples.map(([command, description]) => formatter(command, description)).join("\n"); } -export function formatHelpExampleGroup(label: string, examples: HelpExample[], inline = false) { +export function formatHelpExampleGroup( + label: string, + examples: readonly HelpExample[], + inline = false, +) { return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`; } diff --git a/src/cli/nodes-cli/register.status.ts b/src/cli/nodes-cli/register.status.ts index 4f8e2ae76..00e938a12 100644 --- a/src/cli/nodes-cli/register.status.ts +++ b/src/cli/nodes-cli/register.status.ts @@ -46,6 +46,11 @@ function formatNodeVersions(node: { function parseSinceMs(raw: unknown, label: string): number | undefined { if (raw === undefined || raw === null) return undefined; + if (typeof raw !== "string" && typeof raw !== "number" && typeof raw !== "bigint") { + defaultRuntime.error(`${label}: invalid duration`); + defaultRuntime.exit(1); + return undefined; + } const value = String(raw).trim(); if (!value) return undefined; try { diff --git a/src/cli/pairing-cli.test.ts b/src/cli/pairing-cli.test.ts index 9bbe0e3f2..3a1ef1e3d 100644 --- a/src/cli/pairing-cli.test.ts +++ b/src/cli/pairing-cli.test.ts @@ -71,7 +71,9 @@ describe("pairing cli", () => { await program.parseAsync(["pairing", "list", "--channel", "telegram"], { from: "user", }); - expect(log).toHaveBeenCalledWith(expect.stringContaining("telegramUserId=123")); + const output = log.mock.calls.map(([value]) => String(value)).join("\n"); + expect(output).toContain("telegramUserId"); + expect(output).toContain("123"); }); it("accepts channel as positional for list", async () => { @@ -131,7 +133,9 @@ describe("pairing cli", () => { await program.parseAsync(["pairing", "list", "--channel", "discord"], { from: "user", }); - expect(log).toHaveBeenCalledWith(expect.stringContaining("discordUserId=999")); + const output = log.mock.calls.map(([value]) => String(value)).join("\n"); + expect(output).toContain("discordUserId"); + expect(output).toContain("999"); }); it("accepts channel as positional for approve (npm-run compatible)", async () => {