diff --git a/src/auto-reply/reply/queue/drain-thread-ts.test.ts b/src/auto-reply/reply/queue/drain-thread-ts.test.ts new file mode 100644 index 000000000..6e215fce0 --- /dev/null +++ b/src/auto-reply/reply/queue/drain-thread-ts.test.ts @@ -0,0 +1,184 @@ +import { describe, expect, it } from "vitest"; +import { hasCrossChannelItems } from "../../../utils/queue-helpers.js"; + +/** + * Tests for PR #4413: thread_ts null check fix. + * + * The queue drain logic uses `hasCrossChannelItems` with a key resolver that + * includes threadId in the routing key. Previously it used + * `typeof threadId === "number"` which silently ignored string thread IDs + * (e.g. Slack's `thread_ts = "1706000000.000100"`). The fix uses + * `threadId != null` to handle both number and string thread IDs. + */ + +interface FakeQueueItem { + originatingChannel?: string; + originatingTo?: string; + originatingAccountId?: string; + originatingThreadId?: string | number; +} + +/** Mirrors the resolveKey callback from drain.ts (post-fix version). */ +function resolveKey(item: FakeQueueItem) { + const channel = item.originatingChannel; + const to = item.originatingTo; + const accountId = item.originatingAccountId; + const threadId = item.originatingThreadId; + if (!channel && !to && !accountId && threadId == null) { + return {}; + } + // isRoutableChannel simplified: just check truthy for test purposes + if (!channel || !to) { + return { cross: true }; + } + const threadKey = threadId != null ? String(threadId) : ""; + return { + key: [channel, to, accountId || "", threadKey].join("|"), + }; +} + +/** The OLD (buggy) resolveKey that used typeof === "number". */ +function resolveKeyBuggy(item: FakeQueueItem) { + const channel = item.originatingChannel; + const to = item.originatingTo; + const accountId = item.originatingAccountId; + const threadId = item.originatingThreadId; + if (!channel && !to && !accountId && typeof threadId !== "number") { + return {}; + } + if (!channel || !to) { + return { cross: true }; + } + const threadKey = typeof threadId === "number" ? String(threadId) : ""; + return { + key: [channel, to, accountId || "", threadKey].join("|"), + }; +} + +describe("thread_ts null check (PR #4413)", () => { + describe("resolveKey with string thread IDs (Slack thread_ts)", () => { + const slackItem: FakeQueueItem = { + originatingChannel: "slack", + originatingTo: "C123", + originatingAccountId: "T456", + originatingThreadId: "1706000000.000100", + }; + + it("includes string threadId in key (fixed)", () => { + const result = resolveKey(slackItem); + expect(result.key).toBe("slack|C123|T456|1706000000.000100"); + }); + + it("old code drops string threadId from key (bug)", () => { + const result = resolveKeyBuggy(slackItem); + // Bug: string threadId is ignored, threadKey becomes "" + expect(result.key).toBe("slack|C123|T456|"); + }); + + it("distinguishes messages in different Slack threads", () => { + const items: FakeQueueItem[] = [ + { ...slackItem, originatingThreadId: "1706000000.000100" }, + { ...slackItem, originatingThreadId: "1706000000.000200" }, + ]; + expect(hasCrossChannelItems(items, resolveKey)).toBe(true); + }); + + it("old code collapses different Slack threads into same key (bug)", () => { + const items: FakeQueueItem[] = [ + { ...slackItem, originatingThreadId: "1706000000.000100" }, + { ...slackItem, originatingThreadId: "1706000000.000200" }, + ]; + // Bug: both get key "slack|C123|T456|" so they look same-channel + expect(hasCrossChannelItems(items, resolveKeyBuggy)).toBe(false); + }); + }); + + describe("resolveKey with numeric thread IDs", () => { + it("includes numeric threadId in key", () => { + const result = resolveKey({ + originatingChannel: "discord", + originatingTo: "guild123", + originatingThreadId: 42, + }); + expect(result.key).toBe("discord|guild123||42"); + }); + + it("old code also handles numeric threadId (was already working)", () => { + const result = resolveKeyBuggy({ + originatingChannel: "discord", + originatingTo: "guild123", + originatingThreadId: 42, + }); + expect(result.key).toBe("discord|guild123||42"); + }); + }); + + describe("resolveKey with null/undefined threadId", () => { + it("returns empty object when all fields are empty", () => { + expect(resolveKey({})).toEqual({}); + }); + + it("uses empty string for threadKey when threadId is undefined", () => { + const result = resolveKey({ + originatingChannel: "slack", + originatingTo: "C123", + }); + expect(result.key).toBe("slack|C123||"); + }); + + it("uses empty string for threadKey when threadId is null-ish", () => { + const result = resolveKey({ + originatingChannel: "slack", + originatingTo: "C123", + originatingThreadId: undefined, + }); + expect(result.key).toBe("slack|C123||"); + }); + }); + + describe("finding originatingThreadId in items (second fix site)", () => { + /** + * The drain also does: + * items.find(i => i.originatingThreadId != null)?.originatingThreadId + * Previously: items.find(i => typeof i.originatingThreadId === "number") + */ + const findThreadId = (items: FakeQueueItem[]) => + items.find((i) => i.originatingThreadId != null)?.originatingThreadId; + + const findThreadIdBuggy = (items: FakeQueueItem[]) => + items.find((i) => typeof i.originatingThreadId === "number")?.originatingThreadId; + + it("finds string thread ID (fixed)", () => { + const items: FakeQueueItem[] = [ + { originatingChannel: "slack" }, + { originatingChannel: "slack", originatingThreadId: "1706000000.000100" }, + ]; + expect(findThreadId(items)).toBe("1706000000.000100"); + }); + + it("old code misses string thread ID (bug)", () => { + const items: FakeQueueItem[] = [ + { originatingChannel: "slack" }, + { originatingChannel: "slack", originatingThreadId: "1706000000.000100" }, + ]; + expect(findThreadIdBuggy(items)).toBeUndefined(); + }); + + it("finds numeric thread ID", () => { + const items: FakeQueueItem[] = [ + { originatingChannel: "discord" }, + { originatingChannel: "discord", originatingThreadId: 99 }, + ]; + expect(findThreadId(items)).toBe(99); + expect(findThreadIdBuggy(items)).toBe(99); + }); + + it("returns undefined when no items have threadId", () => { + const items: FakeQueueItem[] = [ + { originatingChannel: "slack" }, + { originatingChannel: "slack" }, + ]; + expect(findThreadId(items)).toBeUndefined(); + }); + }); +}); diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 01361a1ec..593e35e6d 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -40,13 +40,13 @@ export function scheduleFollowupDrain( const to = item.originatingTo; const accountId = item.originatingAccountId; const threadId = item.originatingThreadId; - if (!channel && !to && !accountId && typeof threadId !== "number") { + if (!channel && !to && !accountId && threadId == null) { return {}; } if (!isRoutableChannel(channel) || !to) { return { cross: true }; } - const threadKey = typeof threadId === "number" ? String(threadId) : ""; + const threadKey = threadId != null ? String(threadId) : ""; return { key: [channel, to, accountId || "", threadKey].join("|"), }; @@ -61,7 +61,10 @@ export function scheduleFollowupDrain( } const items = queue.items.splice(0, queue.items.length); - const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" }); + const summary = buildQueueSummaryPrompt({ + state: queue, + noun: "message", + }); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) break; @@ -72,7 +75,7 @@ export function scheduleFollowupDrain( (i) => i.originatingAccountId, )?.originatingAccountId; const originatingThreadId = items.find( - (i) => typeof i.originatingThreadId === "number", + (i) => i.originatingThreadId != null, )?.originatingThreadId; const prompt = buildCollectPrompt({ @@ -93,7 +96,10 @@ export function scheduleFollowupDrain( continue; } - const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" }); + const summaryPrompt = buildQueueSummaryPrompt({ + state: queue, + noun: "message", + }); if (summaryPrompt) { const run = queue.lastRun; if (!run) break; diff --git a/src/cli/cron-cli/shared.ts b/src/cli/cron-cli/shared.ts index 5e5efc81a..2e3ddaa61 100644 --- a/src/cli/cron-cli/shared.ts +++ b/src/cli/cron-cli/shared.ts @@ -141,7 +141,9 @@ export function printCronList(jobs: CronJob[], runtime = defaultRuntime) { const now = Date.now(); for (const job of jobs) { - const idLabel = pad(job.id, CRON_ID_PAD); + // Gateway may return either 'id' or 'jobId' depending on API version + const id = (job as CronJob & { jobId?: string }).jobId ?? job.id ?? ""; + const idLabel = pad(id, CRON_ID_PAD); const nameLabel = pad(truncate(job.name, CRON_NAME_PAD), CRON_NAME_PAD); const scheduleLabel = pad( truncate(formatSchedule(job.schedule), CRON_SCHEDULE_PAD), diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index e49716fe6..02b8b1c67 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -34,11 +34,36 @@ export type OpenAiBatchOutputLine = { export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; const OPENAI_BATCH_MAX_REQUESTS = 50000; +const OPENAI_FETCH_TIMEOUT_MS = 30000; // 30 second timeout for individual fetch calls function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { return openAi.baseUrl?.replace(/\/$/, "") ?? ""; } +async function fetchWithTimeout( + url: string, + options: RequestInit & { timeoutMs?: number }, +): Promise { + const timeoutMs = options.timeoutMs ?? OPENAI_FETCH_TIMEOUT_MS; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + try { + const response = await fetch(url, { + ...options, + signal: controller.signal, + }); + clearTimeout(timeoutId); + return response; + } catch (err) { + clearTimeout(timeoutId); + if (err instanceof Error && err.name === "AbortError") { + throw new Error(`fetch timeout after ${timeoutMs}ms: ${url}`); + } + throw err; + } +} + function getOpenAiHeaders( openAi: OpenAiEmbeddingClient, params: { json: boolean }, @@ -79,10 +104,11 @@ async function submitOpenAiBatch(params: { `memory-embeddings.${hashText(String(Date.now()))}.jsonl`, ); - const fileRes = await fetch(`${baseUrl}/files`, { + const fileRes = await fetchWithTimeout(`${baseUrl}/files`, { method: "POST", headers: getOpenAiHeaders(params.openAi, { json: false }), body: form, + timeoutMs: 60000, // 60s for file upload }); if (!fileRes.ok) { const text = await fileRes.text(); @@ -95,7 +121,7 @@ async function submitOpenAiBatch(params: { const batchRes = await retryAsync( async () => { - const res = await fetch(`${baseUrl}/batches`, { + const res = await fetchWithTimeout(`${baseUrl}/batches`, { method: "POST", headers: getOpenAiHeaders(params.openAi, { json: true }), body: JSON.stringify({ @@ -137,7 +163,7 @@ async function fetchOpenAiBatchStatus(params: { batchId: string; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { + const res = await fetchWithTimeout(`${baseUrl}/batches/${params.batchId}`, { headers: getOpenAiHeaders(params.openAi, { json: true }), }); if (!res.ok) { @@ -152,8 +178,9 @@ async function fetchOpenAiFileContent(params: { fileId: string; }): Promise { const baseUrl = getOpenAiBaseUrl(params.openAi); - const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, { + const res = await fetchWithTimeout(`${baseUrl}/files/${params.fileId}/content`, { headers: getOpenAiHeaders(params.openAi, { json: true }), + timeoutMs: 60000, // 60s for file download }); if (!res.ok) { const text = await res.text(); @@ -208,10 +235,24 @@ async function waitForOpenAiBatch(params: { while (true) { const status = current ?? - (await fetchOpenAiBatchStatus({ - openAi: params.openAi, - batchId: params.batchId, - })); + (await retryAsync( + async () => + await fetchOpenAiBatchStatus({ + openAi: params.openAi, + batchId: params.batchId, + }), + { + attempts: 3, + minDelayMs: 500, + maxDelayMs: 2000, + jitter: 0.2, + shouldRetry: (err) => { + // Retry on network errors and timeout errors + const message = err instanceof Error ? err.message : String(err); + return message.includes("timeout") || message.includes("fetch failed"); + }, + }, + )); const state = status.status ?? "unknown"; if (state === "completed") { if (!status.output_file_id) {