Compare commits

...

2 Commits

Author SHA1 Message Date
Peter Steinberger
51a7d41c6b fix: enforce node invoke timeouts (#1357) (thanks @vignesh07) 2026-01-21 05:48:54 +00:00
Vignesh Natarajan
a959e490b3 fix(node): enforce node.invoke timeout in node client
Use the timeout provided on node invoke requests to ensure node
clients always respond with a result.

This prevents gateway-side node.invoke calls from hanging until the
gateway timeout when a node command stalls.

Tests:
- swift test --filter GatewayNodeSessionTests
2026-01-21 05:07:30 +00:00
8 changed files with 181 additions and 7 deletions

View File

@ -32,6 +32,7 @@ Docs: https://docs.clawd.bot
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin. - Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx. - CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output. - CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output.
- Nodes: enforce node.invoke timeouts for node handlers. (#1357) — thanks @vignesh07.
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk. - Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
- Doctor: clarify plugin auto-enable hint text in the startup banner. - Doctor: clarify plugin auto-enable hint text in the startup banner.
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.

View File

@ -11,6 +11,35 @@ private struct NodeInvokeRequestPayload: Codable, Sendable {
var idempotencyKey: String? var idempotencyKey: String?
} }
// Ensures the timeout can win even if the invoke task never completes.
private actor InvokeTimeoutRace {
private var finished = false
private let continuation: CheckedContinuation<BridgeInvokeResponse, Never>
private var invokeTask: Task<Void, Never>?
private var timeoutTask: Task<Void, Never>?
init(continuation: CheckedContinuation<BridgeInvokeResponse, Never>) {
self.continuation = continuation
}
func registerTasks(invoke: Task<Void, Never>, timeout: Task<Void, Never>) {
self.invokeTask = invoke
self.timeoutTask = timeout
if finished {
invoke.cancel()
timeout.cancel()
}
}
func finish(_ response: BridgeInvokeResponse) {
guard !finished else { return }
finished = true
continuation.resume(returning: response)
invokeTask?.cancel()
timeoutTask?.cancel()
}
}
public actor GatewayNodeSession { public actor GatewayNodeSession {
private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway") private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway")
private let decoder = JSONDecoder() private let decoder = JSONDecoder()
@ -23,6 +52,45 @@ public actor GatewayNodeSession {
private var onConnected: (@Sendable () async -> Void)? private var onConnected: (@Sendable () async -> Void)?
private var onDisconnected: (@Sendable (String) async -> Void)? private var onDisconnected: (@Sendable (String) async -> Void)?
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)? private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
static func invokeWithTimeout(
request: BridgeInvokeRequest,
timeoutMs: Int?,
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
) async -> BridgeInvokeResponse {
let timeout = max(0, timeoutMs ?? 0)
guard timeout > 0 else {
return await onInvoke(request)
}
let cappedTimeout = min(timeout, Int(UInt64.max / 1_000_000))
let timeoutResponse = BridgeInvokeResponse(
id: request.id,
ok: false,
error: ClawdbotNodeError(
code: .unavailable,
message: "node invoke timed out")
)
return await withCheckedContinuation { continuation in
let race = InvokeTimeoutRace(continuation: continuation)
let invokeTask = Task {
let response = await onInvoke(request)
await race.finish(response)
}
let timeoutTask = Task {
do {
try await Task.sleep(nanoseconds: UInt64(cappedTimeout) * 1_000_000)
} catch {
return
}
await race.finish(timeoutResponse)
}
Task {
await race.registerTasks(invoke: invokeTask, timeout: timeoutTask)
}
}
}
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:] private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
private var canvasHostUrl: String? private var canvasHostUrl: String?
@ -167,7 +235,11 @@ public actor GatewayNodeSession {
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data) let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
guard let onInvoke else { return } guard let onInvoke else { return }
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON) let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
let response = await onInvoke(req) let response = await Self.invokeWithTimeout(
request: req,
timeoutMs: request.timeoutMs,
onInvoke: onInvoke
)
await self.sendInvokeResult(request: request, response: response) await self.sendInvokeResult(request: request, response: response)
} catch { } catch {
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)") self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")

View File

@ -0,0 +1,78 @@
import Foundation
import Testing
@testable import ClawdbotKit
import ClawdbotProtocol
struct GatewayNodeSessionTests {
@Test
func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async {
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 50,
onInvoke: { req in
#expect(req.id == "1")
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil)
}
)
#expect(response.ok == true)
#expect(response.error == nil)
#expect(response.payloadJSON == "{}")
}
@Test
func invokeWithTimeoutReturnsTimeoutError() async {
let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 10,
onInvoke: { _ in
try? await Task.sleep(nanoseconds: 200_000_000) // 200ms
return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil)
}
)
#expect(response.ok == false)
#expect(response.error?.code == .unavailable)
#expect(response.error?.message.contains("timed out") == true)
}
@Test
func invokeWithTimeoutReturnsWhenHandlerNeverCompletes() async {
let request = BridgeInvokeRequest(id: "stall", command: "x", paramsJSON: nil)
let response = try? await AsyncTimeout.withTimeoutMs(
timeoutMs: 200,
onTimeout: { NSError(domain: "GatewayNodeSessionTests", code: 1) },
operation: {
await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 10,
onInvoke: { _ in
await withCheckedContinuation { _ in }
}
)
}
)
#expect(response != nil)
#expect(response?.ok == false)
#expect(response?.error?.code == .unavailable)
}
@Test
func invokeWithTimeoutZeroDisablesTimeout() async {
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
let response = await GatewayNodeSession.invokeWithTimeout(
request: request,
timeoutMs: 0,
onInvoke: { req in
try? await Task.sleep(nanoseconds: 5_000_000)
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil)
}
)
#expect(response.ok == true)
#expect(response.error == nil)
}
}

View File

@ -9,6 +9,7 @@ import { defaultRuntime } from "../runtime.js";
import { formatDocsLink } from "../terminal/links.js"; import { formatDocsLink } from "../terminal/links.js";
import { theme } from "../terminal/theme.js"; import { theme } from "../terminal/theme.js";
import { renderTable } from "../terminal/table.js"; import { renderTable } from "../terminal/table.js";
import type { ChannelDirectoryEntry } from "../channels/plugins/types.core.js";
function parseLimit(value: unknown): number | null { function parseLimit(value: unknown): number | null {
if (typeof value === "number" && Number.isFinite(value)) { if (typeof value === "number" && Number.isFinite(value)) {
@ -30,6 +31,15 @@ function buildRows(entries: Array<{ id: string; name?: string | undefined }>) {
})); }));
} }
function formatEntry(entry: ChannelDirectoryEntry): string {
const name = entry.name?.trim();
const handle = entry.handle?.trim();
const handleLabel = handle ? (handle.startsWith("@") ? handle : `@${handle}`) : null;
const label = [name, handleLabel].filter(Boolean).join(" ");
if (!label) return entry.id;
return `${label} ${theme.muted(`(${entry.id})`)}`;
}
export function registerDirectoryCli(program: Command) { export function registerDirectoryCli(program: Command) {
const directory = program const directory = program
.command("directory") .command("directory")

View File

@ -7,6 +7,6 @@ describe("dns cli", () => {
const log = vi.spyOn(console, "log").mockImplementation(() => {}); const log = vi.spyOn(console, "log").mockImplementation(() => {});
const program = buildProgram(); const program = buildProgram();
await program.parseAsync(["dns", "setup"], { from: "user" }); await program.parseAsync(["dns", "setup"], { from: "user" });
expect(log).toHaveBeenCalledWith(expect.stringContaining("Domain:")); expect(log).toHaveBeenCalledWith(expect.stringContaining("clawdbot.internal"));
}); });
}); });

View File

@ -1,6 +1,6 @@
import { theme } from "../terminal/theme.js"; import { theme } from "../terminal/theme.js";
export type HelpExample = [command: string, description: string]; export type HelpExample = readonly [command: string, description: string];
export function formatHelpExample(command: string, description: string): string { export function formatHelpExample(command: string, description: string): string {
return ` ${theme.command(command)}\n ${theme.muted(description)}`; return ` ${theme.command(command)}\n ${theme.muted(description)}`;
@ -11,11 +11,15 @@ export function formatHelpExampleLine(command: string, description: string): str
return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`; return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`;
} }
export function formatHelpExamples(examples: HelpExample[], inline = false): string { export function formatHelpExamples(examples: readonly HelpExample[], inline = false): string {
const formatter = inline ? formatHelpExampleLine : formatHelpExample; const formatter = inline ? formatHelpExampleLine : formatHelpExample;
return examples.map(([command, description]) => formatter(command, description)).join("\n"); return examples.map(([command, description]) => formatter(command, description)).join("\n");
} }
export function formatHelpExampleGroup(label: string, examples: HelpExample[], inline = false) { export function formatHelpExampleGroup(
label: string,
examples: readonly HelpExample[],
inline = false,
) {
return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`; return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`;
} }

View File

@ -46,6 +46,11 @@ function formatNodeVersions(node: {
function parseSinceMs(raw: unknown, label: string): number | undefined { function parseSinceMs(raw: unknown, label: string): number | undefined {
if (raw === undefined || raw === null) return undefined; if (raw === undefined || raw === null) return undefined;
if (typeof raw !== "string" && typeof raw !== "number" && typeof raw !== "bigint") {
defaultRuntime.error(`${label}: invalid duration`);
defaultRuntime.exit(1);
return undefined;
}
const value = String(raw).trim(); const value = String(raw).trim();
if (!value) return undefined; if (!value) return undefined;
try { try {

View File

@ -71,7 +71,9 @@ describe("pairing cli", () => {
await program.parseAsync(["pairing", "list", "--channel", "telegram"], { await program.parseAsync(["pairing", "list", "--channel", "telegram"], {
from: "user", from: "user",
}); });
expect(log).toHaveBeenCalledWith(expect.stringContaining("telegramUserId=123")); const output = log.mock.calls.map(([value]) => String(value)).join("\n");
expect(output).toContain("telegramUserId");
expect(output).toContain("123");
}); });
it("accepts channel as positional for list", async () => { it("accepts channel as positional for list", async () => {
@ -131,7 +133,9 @@ describe("pairing cli", () => {
await program.parseAsync(["pairing", "list", "--channel", "discord"], { await program.parseAsync(["pairing", "list", "--channel", "discord"], {
from: "user", from: "user",
}); });
expect(log).toHaveBeenCalledWith(expect.stringContaining("discordUserId=999")); const output = log.mock.calls.map(([value]) => String(value)).join("\n");
expect(output).toContain("discordUserId");
expect(output).toContain("999");
}); });
it("accepts channel as positional for approve (npm-run compatible)", async () => { it("accepts channel as positional for approve (npm-run compatible)", async () => {