Merge a52db23c08 into 09be5d45d5
This commit is contained in:
commit
aa64a2fb03
1
openclaw
Submodule
1
openclaw
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit ccac2aeec2dad4bbca49707568311123edf5545a
|
||||
@ -13,7 +13,31 @@ export const GATEWAY_CLIENT_IDS = {
|
||||
PROBE: "openclaw-probe",
|
||||
} as const;
|
||||
|
||||
export type GatewayClientId = (typeof GATEWAY_CLIENT_IDS)[keyof typeof GATEWAY_CLIENT_IDS];
|
||||
// Legacy client IDs for backward compatibility
|
||||
// These are deprecated but still accepted to prevent breakage during upgrades
|
||||
export const LEGACY_GATEWAY_CLIENT_IDS = {
|
||||
// Clawdbot era (pre-2026.1.29)
|
||||
CLAWDBOT_CONTROL_UI: "clawdbot-control-ui",
|
||||
CLAWDBOT_MACOS_APP: "clawdbot-macos",
|
||||
CLAWDBOT_IOS_APP: "clawdbot-ios",
|
||||
CLAWDBOT_ANDROID_APP: "clawdbot-android",
|
||||
CLAWDBOT_PROBE: "clawdbot-probe",
|
||||
// Moltbot era (intermediate rebrand)
|
||||
MOLTBOT_CONTROL_UI: "moltbot-control-ui",
|
||||
MOLTBOT_MACOS_APP: "moltbot-macos",
|
||||
MOLTBOT_IOS_APP: "moltbot-ios",
|
||||
MOLTBOT_ANDROID_APP: "moltbot-android",
|
||||
MOLTBOT_PROBE: "moltbot-probe",
|
||||
} as const;
|
||||
|
||||
export const ALL_GATEWAY_CLIENT_IDS = {
|
||||
...GATEWAY_CLIENT_IDS,
|
||||
...LEGACY_GATEWAY_CLIENT_IDS,
|
||||
} as const;
|
||||
|
||||
export type GatewayClientId =
|
||||
| (typeof GATEWAY_CLIENT_IDS)[keyof typeof GATEWAY_CLIENT_IDS]
|
||||
| (typeof LEGACY_GATEWAY_CLIENT_IDS)[keyof typeof LEGACY_GATEWAY_CLIENT_IDS];
|
||||
|
||||
// Back-compat naming (internal): these values are IDs, not display names.
|
||||
export const GATEWAY_CLIENT_NAMES = GATEWAY_CLIENT_IDS;
|
||||
@ -42,7 +66,7 @@ export type GatewayClientInfo = {
|
||||
instanceId?: string;
|
||||
};
|
||||
|
||||
const GATEWAY_CLIENT_ID_SET = new Set<GatewayClientId>(Object.values(GATEWAY_CLIENT_IDS));
|
||||
const GATEWAY_CLIENT_ID_SET = new Set<GatewayClientId>(Object.values(ALL_GATEWAY_CLIENT_IDS));
|
||||
const GATEWAY_CLIENT_MODE_SET = new Set<GatewayClientMode>(Object.values(GATEWAY_CLIENT_MODES));
|
||||
|
||||
export function normalizeGatewayClientId(raw?: string | null): GatewayClientId | undefined {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import { SESSION_LABEL_MAX_LENGTH } from "../../../sessions/session-label.js";
|
||||
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "../client-info.js";
|
||||
import { ALL_GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "../client-info.js";
|
||||
|
||||
export const NonEmptyString = Type.String({ minLength: 1 });
|
||||
export const SessionLabelString = Type.String({
|
||||
@ -9,7 +9,7 @@ export const SessionLabelString = Type.String({
|
||||
});
|
||||
|
||||
export const GatewayClientIdSchema = Type.Union(
|
||||
Object.values(GATEWAY_CLIENT_IDS).map((value) => Type.Literal(value)),
|
||||
Object.values(ALL_GATEWAY_CLIENT_IDS).map((value) => Type.Literal(value)),
|
||||
);
|
||||
|
||||
export const GatewayClientModeSchema = Type.Union(
|
||||
|
||||
@ -91,3 +91,60 @@ test("accepts openclaw-android as a valid gateway client id", async () => {
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
test("accepts legacy clawdbot-ios as a valid gateway client id (backward compat)", async () => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
|
||||
const res = await connectReq(ws, { clientId: "clawdbot-ios", platform: "ios" });
|
||||
// We don't care if auth fails here; we only care that schema validation accepts the client id.
|
||||
// A schema rejection would close the socket before sending a response.
|
||||
if (!res.ok) {
|
||||
// allow unauthorized error when gateway requires auth
|
||||
// but reject schema validation errors
|
||||
const message = String(res.error?.message ?? "");
|
||||
if (message.includes("invalid connect params")) {
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
test("accepts legacy clawdbot-android as a valid gateway client id (backward compat)", async () => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
|
||||
const res = await connectReq(ws, { clientId: "clawdbot-android", platform: "android" });
|
||||
// We don't care if auth fails here; we only care that schema validation accepts the client id.
|
||||
// A schema rejection would close the socket before sending a response.
|
||||
if (!res.ok) {
|
||||
// allow unauthorized error when gateway requires auth
|
||||
// but reject schema validation errors
|
||||
const message = String(res.error?.message ?? "");
|
||||
if (message.includes("invalid connect params")) {
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
test("accepts legacy moltbot-macos as a valid gateway client id (backward compat)", async () => {
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
|
||||
const res = await connectReq(ws, { clientId: "moltbot-macos", platform: "macos" });
|
||||
// We don't care if auth fails here; we only care that schema validation accepts the client id.
|
||||
// A schema rejection would close the socket before sending a response.
|
||||
if (!res.ok) {
|
||||
// allow unauthorized error when gateway requires auth
|
||||
// but reject schema validation errors
|
||||
const message = String(res.error?.message ?? "");
|
||||
if (message.includes("invalid connect params")) {
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
@ -184,4 +184,122 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
|
||||
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
|
||||
});
|
||||
|
||||
it("retries on getUpdates timeout errors", async () => {
|
||||
const timeoutError = Object.assign(
|
||||
new Error("Request to 'getUpdates' timed out after 30 seconds"),
|
||||
{
|
||||
method: "getUpdates",
|
||||
description: "Request timed out",
|
||||
},
|
||||
);
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(timeoutError),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("restarts when runner exhausts retries and stops normally", async () => {
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(), // Runner stopped normally after exhausting retries
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("retries on getUpdates conflict errors (409)", async () => {
|
||||
const conflictError = Object.assign(new Error("Conflict: terminated by other getUpdates"), {
|
||||
error_code: 409,
|
||||
description: "Conflict: terminated by other getUpdates request",
|
||||
method: "getUpdates",
|
||||
});
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(conflictError),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("respects abort signal during retry backoff", async () => {
|
||||
const abortController = new AbortController();
|
||||
const networkError = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" });
|
||||
|
||||
// Mock sleepWithAbort to abort mid-sleep
|
||||
sleepWithAbort.mockImplementationOnce(async () => {
|
||||
abortController.abort();
|
||||
throw new Error("Aborted");
|
||||
});
|
||||
|
||||
runSpy.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(networkError),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok", abortSignal: abortController.signal });
|
||||
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses exponential backoff for consecutive failures", async () => {
|
||||
computeBackoff.mockReturnValueOnce(2000).mockReturnValueOnce(3600).mockReturnValueOnce(6480);
|
||||
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(Object.assign(new Error("timeout"), { code: "ETIMEDOUT" })),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(Object.assign(new Error("timeout"), { code: "ETIMEDOUT" })),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(Object.assign(new Error("timeout"), { code: "ETIMEDOUT" })),
|
||||
stop: vi.fn(),
|
||||
}))
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
// Verify backoff was called with increasing attempt numbers
|
||||
expect(computeBackoff).toHaveBeenCalledTimes(4); // 3 failures + 1 normal stop
|
||||
expect(computeBackoff).toHaveBeenCalledWith(expect.anything(), 1);
|
||||
expect(computeBackoff).toHaveBeenCalledWith(expect.anything(), 2);
|
||||
expect(computeBackoff).toHaveBeenCalledWith(expect.anything(), 3);
|
||||
});
|
||||
});
|
||||
|
||||
@ -74,6 +74,22 @@ const isGetUpdatesConflict = (err: unknown) => {
|
||||
return haystack.includes("getupdates");
|
||||
};
|
||||
|
||||
const isGetUpdatesTimeout = (err: unknown) => {
|
||||
if (!err || typeof err !== "object") return false;
|
||||
const typed = err as {
|
||||
description?: string;
|
||||
method?: string;
|
||||
message?: string;
|
||||
error_description?: string;
|
||||
};
|
||||
const haystack = [typed.method, typed.description, typed.message, typed.error_description]
|
||||
.filter((value): value is string => typeof value === "string")
|
||||
.join(" ")
|
||||
.toLowerCase();
|
||||
|
||||
return haystack.includes("getupdates") && haystack.includes("timeout");
|
||||
};
|
||||
|
||||
const NETWORK_ERROR_SNIPPETS = [
|
||||
"fetch failed",
|
||||
"network",
|
||||
@ -168,20 +184,41 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
try {
|
||||
// runner.task() returns a promise that resolves when the runner stops
|
||||
await runner.task();
|
||||
return;
|
||||
// Runner stopped normally (aborted or exhausted retries).
|
||||
// If not aborted, this is a transient failure - retry with backoff.
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
// grammY runner exhausted its maxRetryTime - treat as recoverable network error
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram polling stopped after retry timeout; restarting in ${formatDurationMs(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) return;
|
||||
throw sleepErr;
|
||||
}
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isTimeout = isGetUpdatesTimeout(err);
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
const isNetworkError = isNetworkRelatedError(err);
|
||||
if (!isConflict && !isRecoverable && !isNetworkError) {
|
||||
if (!isConflict && !isTimeout && !isRecoverable && !isNetworkError) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const reason = isConflict
|
||||
? "getUpdates conflict"
|
||||
: isTimeout
|
||||
? "getUpdates timeout"
|
||||
: "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user