fix: enforce node invoke timeouts (#1357) (thanks @vignesh07)
This commit is contained in:
parent
a959e490b3
commit
51a7d41c6b
@ -32,6 +32,7 @@ Docs: https://docs.clawd.bot
|
|||||||
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
|
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
|
||||||
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
|
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
|
||||||
- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output.
|
- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output.
|
||||||
|
- Nodes: enforce node.invoke timeouts for node handlers. (#1357) — thanks @vignesh07.
|
||||||
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
||||||
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
||||||
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
||||||
|
|||||||
@ -11,6 +11,35 @@ private struct NodeInvokeRequestPayload: Codable, Sendable {
|
|||||||
var idempotencyKey: String?
|
var idempotencyKey: String?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensures the timeout can win even if the invoke task never completes.
|
||||||
|
private actor InvokeTimeoutRace {
|
||||||
|
private var finished = false
|
||||||
|
private let continuation: CheckedContinuation<BridgeInvokeResponse, Never>
|
||||||
|
private var invokeTask: Task<Void, Never>?
|
||||||
|
private var timeoutTask: Task<Void, Never>?
|
||||||
|
|
||||||
|
init(continuation: CheckedContinuation<BridgeInvokeResponse, Never>) {
|
||||||
|
self.continuation = continuation
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerTasks(invoke: Task<Void, Never>, timeout: Task<Void, Never>) {
|
||||||
|
self.invokeTask = invoke
|
||||||
|
self.timeoutTask = timeout
|
||||||
|
if finished {
|
||||||
|
invoke.cancel()
|
||||||
|
timeout.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func finish(_ response: BridgeInvokeResponse) {
|
||||||
|
guard !finished else { return }
|
||||||
|
finished = true
|
||||||
|
continuation.resume(returning: response)
|
||||||
|
invokeTask?.cancel()
|
||||||
|
timeoutTask?.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public actor GatewayNodeSession {
|
public actor GatewayNodeSession {
|
||||||
private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway")
|
private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway")
|
||||||
private let decoder = JSONDecoder()
|
private let decoder = JSONDecoder()
|
||||||
@ -34,22 +63,32 @@ public actor GatewayNodeSession {
|
|||||||
return await onInvoke(request)
|
return await onInvoke(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
return await withTaskGroup(of: BridgeInvokeResponse.self) { group in
|
let cappedTimeout = min(timeout, Int(UInt64.max / 1_000_000))
|
||||||
group.addTask { await onInvoke(request) }
|
let timeoutResponse = BridgeInvokeResponse(
|
||||||
group.addTask {
|
id: request.id,
|
||||||
try? await Task.sleep(nanoseconds: UInt64(timeout) * 1_000_000)
|
ok: false,
|
||||||
return BridgeInvokeResponse(
|
error: ClawdbotNodeError(
|
||||||
id: request.id,
|
code: .unavailable,
|
||||||
ok: false,
|
message: "node invoke timed out")
|
||||||
error: ClawdbotNodeError(
|
)
|
||||||
code: .unavailable,
|
|
||||||
message: "node invoke timed out")
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
let first = await group.next()!
|
return await withCheckedContinuation { continuation in
|
||||||
group.cancelAll()
|
let race = InvokeTimeoutRace(continuation: continuation)
|
||||||
return first
|
let invokeTask = Task {
|
||||||
|
let response = await onInvoke(request)
|
||||||
|
await race.finish(response)
|
||||||
|
}
|
||||||
|
let timeoutTask = Task {
|
||||||
|
do {
|
||||||
|
try await Task.sleep(nanoseconds: UInt64(cappedTimeout) * 1_000_000)
|
||||||
|
} catch {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
await race.finish(timeoutResponse)
|
||||||
|
}
|
||||||
|
Task {
|
||||||
|
await race.registerTasks(invoke: invokeTask, timeout: timeoutTask)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
|
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
|
||||||
|
|||||||
@ -38,6 +38,28 @@ struct GatewayNodeSessionTests {
|
|||||||
#expect(response.error?.message.contains("timed out") == true)
|
#expect(response.error?.message.contains("timed out") == true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
func invokeWithTimeoutReturnsWhenHandlerNeverCompletes() async {
|
||||||
|
let request = BridgeInvokeRequest(id: "stall", command: "x", paramsJSON: nil)
|
||||||
|
let response = try? await AsyncTimeout.withTimeoutMs(
|
||||||
|
timeoutMs: 200,
|
||||||
|
onTimeout: { NSError(domain: "GatewayNodeSessionTests", code: 1) },
|
||||||
|
operation: {
|
||||||
|
await GatewayNodeSession.invokeWithTimeout(
|
||||||
|
request: request,
|
||||||
|
timeoutMs: 10,
|
||||||
|
onInvoke: { _ in
|
||||||
|
await withCheckedContinuation { _ in }
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
#expect(response != nil)
|
||||||
|
#expect(response?.ok == false)
|
||||||
|
#expect(response?.error?.code == .unavailable)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
func invokeWithTimeoutZeroDisablesTimeout() async {
|
func invokeWithTimeoutZeroDisablesTimeout() async {
|
||||||
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import { defaultRuntime } from "../runtime.js";
|
|||||||
import { formatDocsLink } from "../terminal/links.js";
|
import { formatDocsLink } from "../terminal/links.js";
|
||||||
import { theme } from "../terminal/theme.js";
|
import { theme } from "../terminal/theme.js";
|
||||||
import { renderTable } from "../terminal/table.js";
|
import { renderTable } from "../terminal/table.js";
|
||||||
|
import type { ChannelDirectoryEntry } from "../channels/plugins/types.core.js";
|
||||||
|
|
||||||
function parseLimit(value: unknown): number | null {
|
function parseLimit(value: unknown): number | null {
|
||||||
if (typeof value === "number" && Number.isFinite(value)) {
|
if (typeof value === "number" && Number.isFinite(value)) {
|
||||||
@ -30,6 +31,15 @@ function buildRows(entries: Array<{ id: string; name?: string | undefined }>) {
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function formatEntry(entry: ChannelDirectoryEntry): string {
|
||||||
|
const name = entry.name?.trim();
|
||||||
|
const handle = entry.handle?.trim();
|
||||||
|
const handleLabel = handle ? (handle.startsWith("@") ? handle : `@${handle}`) : null;
|
||||||
|
const label = [name, handleLabel].filter(Boolean).join(" ");
|
||||||
|
if (!label) return entry.id;
|
||||||
|
return `${label} ${theme.muted(`(${entry.id})`)}`;
|
||||||
|
}
|
||||||
|
|
||||||
export function registerDirectoryCli(program: Command) {
|
export function registerDirectoryCli(program: Command) {
|
||||||
const directory = program
|
const directory = program
|
||||||
.command("directory")
|
.command("directory")
|
||||||
|
|||||||
@ -7,6 +7,6 @@ describe("dns cli", () => {
|
|||||||
const log = vi.spyOn(console, "log").mockImplementation(() => {});
|
const log = vi.spyOn(console, "log").mockImplementation(() => {});
|
||||||
const program = buildProgram();
|
const program = buildProgram();
|
||||||
await program.parseAsync(["dns", "setup"], { from: "user" });
|
await program.parseAsync(["dns", "setup"], { from: "user" });
|
||||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("Domain:"));
|
expect(log).toHaveBeenCalledWith(expect.stringContaining("clawdbot.internal"));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import { theme } from "../terminal/theme.js";
|
import { theme } from "../terminal/theme.js";
|
||||||
|
|
||||||
export type HelpExample = [command: string, description: string];
|
export type HelpExample = readonly [command: string, description: string];
|
||||||
|
|
||||||
export function formatHelpExample(command: string, description: string): string {
|
export function formatHelpExample(command: string, description: string): string {
|
||||||
return ` ${theme.command(command)}\n ${theme.muted(description)}`;
|
return ` ${theme.command(command)}\n ${theme.muted(description)}`;
|
||||||
@ -11,11 +11,15 @@ export function formatHelpExampleLine(command: string, description: string): str
|
|||||||
return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`;
|
return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function formatHelpExamples(examples: HelpExample[], inline = false): string {
|
export function formatHelpExamples(examples: readonly HelpExample[], inline = false): string {
|
||||||
const formatter = inline ? formatHelpExampleLine : formatHelpExample;
|
const formatter = inline ? formatHelpExampleLine : formatHelpExample;
|
||||||
return examples.map(([command, description]) => formatter(command, description)).join("\n");
|
return examples.map(([command, description]) => formatter(command, description)).join("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
export function formatHelpExampleGroup(label: string, examples: HelpExample[], inline = false) {
|
export function formatHelpExampleGroup(
|
||||||
|
label: string,
|
||||||
|
examples: readonly HelpExample[],
|
||||||
|
inline = false,
|
||||||
|
) {
|
||||||
return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`;
|
return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,6 +46,11 @@ function formatNodeVersions(node: {
|
|||||||
|
|
||||||
function parseSinceMs(raw: unknown, label: string): number | undefined {
|
function parseSinceMs(raw: unknown, label: string): number | undefined {
|
||||||
if (raw === undefined || raw === null) return undefined;
|
if (raw === undefined || raw === null) return undefined;
|
||||||
|
if (typeof raw !== "string" && typeof raw !== "number" && typeof raw !== "bigint") {
|
||||||
|
defaultRuntime.error(`${label}: invalid duration`);
|
||||||
|
defaultRuntime.exit(1);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
const value = String(raw).trim();
|
const value = String(raw).trim();
|
||||||
if (!value) return undefined;
|
if (!value) return undefined;
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -71,7 +71,9 @@ describe("pairing cli", () => {
|
|||||||
await program.parseAsync(["pairing", "list", "--channel", "telegram"], {
|
await program.parseAsync(["pairing", "list", "--channel", "telegram"], {
|
||||||
from: "user",
|
from: "user",
|
||||||
});
|
});
|
||||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("telegramUserId=123"));
|
const output = log.mock.calls.map(([value]) => String(value)).join("\n");
|
||||||
|
expect(output).toContain("telegramUserId");
|
||||||
|
expect(output).toContain("123");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("accepts channel as positional for list", async () => {
|
it("accepts channel as positional for list", async () => {
|
||||||
@ -131,7 +133,9 @@ describe("pairing cli", () => {
|
|||||||
await program.parseAsync(["pairing", "list", "--channel", "discord"], {
|
await program.parseAsync(["pairing", "list", "--channel", "discord"], {
|
||||||
from: "user",
|
from: "user",
|
||||||
});
|
});
|
||||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("discordUserId=999"));
|
const output = log.mock.calls.map(([value]) => String(value)).join("\n");
|
||||||
|
expect(output).toContain("discordUserId");
|
||||||
|
expect(output).toContain("999");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("accepts channel as positional for approve (npm-run compatible)", async () => {
|
it("accepts channel as positional for approve (npm-run compatible)", async () => {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user