This commit is contained in:
oogway 2026-01-30 17:05:54 +05:30 committed by GitHub
commit 534ca33b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 247 additions and 14 deletions

View File

@ -0,0 +1,184 @@
import { describe, expect, it } from "vitest";
import { hasCrossChannelItems } from "../../../utils/queue-helpers.js";
/**
* Tests for PR #4413: thread_ts null check fix.
*
* The queue drain logic uses `hasCrossChannelItems` with a key resolver that
* includes threadId in the routing key. Previously it used
* `typeof threadId === "number"` which silently ignored string thread IDs
* (e.g. Slack's `thread_ts = "1706000000.000100"`). The fix uses
* `threadId != null` to handle both number and string thread IDs.
*/
interface FakeQueueItem {
originatingChannel?: string;
originatingTo?: string;
originatingAccountId?: string;
originatingThreadId?: string | number;
}
/** Mirrors the resolveKey callback from drain.ts (post-fix version). */
function resolveKey(item: FakeQueueItem) {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && threadId == null) {
return {};
}
// isRoutableChannel simplified: just check truthy for test purposes
if (!channel || !to) {
return { cross: true };
}
const threadKey = threadId != null ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
}
/** The OLD (buggy) resolveKey that used typeof === "number". */
function resolveKeyBuggy(item: FakeQueueItem) {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
return {};
}
if (!channel || !to) {
return { cross: true };
}
const threadKey = typeof threadId === "number" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
}
describe("thread_ts null check (PR #4413)", () => {
describe("resolveKey with string thread IDs (Slack thread_ts)", () => {
const slackItem: FakeQueueItem = {
originatingChannel: "slack",
originatingTo: "C123",
originatingAccountId: "T456",
originatingThreadId: "1706000000.000100",
};
it("includes string threadId in key (fixed)", () => {
const result = resolveKey(slackItem);
expect(result.key).toBe("slack|C123|T456|1706000000.000100");
});
it("old code drops string threadId from key (bug)", () => {
const result = resolveKeyBuggy(slackItem);
// Bug: string threadId is ignored, threadKey becomes ""
expect(result.key).toBe("slack|C123|T456|");
});
it("distinguishes messages in different Slack threads", () => {
const items: FakeQueueItem[] = [
{ ...slackItem, originatingThreadId: "1706000000.000100" },
{ ...slackItem, originatingThreadId: "1706000000.000200" },
];
expect(hasCrossChannelItems(items, resolveKey)).toBe(true);
});
it("old code collapses different Slack threads into same key (bug)", () => {
const items: FakeQueueItem[] = [
{ ...slackItem, originatingThreadId: "1706000000.000100" },
{ ...slackItem, originatingThreadId: "1706000000.000200" },
];
// Bug: both get key "slack|C123|T456|" so they look same-channel
expect(hasCrossChannelItems(items, resolveKeyBuggy)).toBe(false);
});
});
describe("resolveKey with numeric thread IDs", () => {
it("includes numeric threadId in key", () => {
const result = resolveKey({
originatingChannel: "discord",
originatingTo: "guild123",
originatingThreadId: 42,
});
expect(result.key).toBe("discord|guild123||42");
});
it("old code also handles numeric threadId (was already working)", () => {
const result = resolveKeyBuggy({
originatingChannel: "discord",
originatingTo: "guild123",
originatingThreadId: 42,
});
expect(result.key).toBe("discord|guild123||42");
});
});
describe("resolveKey with null/undefined threadId", () => {
it("returns empty object when all fields are empty", () => {
expect(resolveKey({})).toEqual({});
});
it("uses empty string for threadKey when threadId is undefined", () => {
const result = resolveKey({
originatingChannel: "slack",
originatingTo: "C123",
});
expect(result.key).toBe("slack|C123||");
});
it("uses empty string for threadKey when threadId is null-ish", () => {
const result = resolveKey({
originatingChannel: "slack",
originatingTo: "C123",
originatingThreadId: undefined,
});
expect(result.key).toBe("slack|C123||");
});
});
describe("finding originatingThreadId in items (second fix site)", () => {
/**
* The drain also does:
* items.find(i => i.originatingThreadId != null)?.originatingThreadId
* Previously: items.find(i => typeof i.originatingThreadId === "number")
*/
const findThreadId = (items: FakeQueueItem[]) =>
items.find((i) => i.originatingThreadId != null)?.originatingThreadId;
const findThreadIdBuggy = (items: FakeQueueItem[]) =>
items.find((i) => typeof i.originatingThreadId === "number")?.originatingThreadId;
it("finds string thread ID (fixed)", () => {
const items: FakeQueueItem[] = [
{ originatingChannel: "slack" },
{ originatingChannel: "slack", originatingThreadId: "1706000000.000100" },
];
expect(findThreadId(items)).toBe("1706000000.000100");
});
it("old code misses string thread ID (bug)", () => {
const items: FakeQueueItem[] = [
{ originatingChannel: "slack" },
{ originatingChannel: "slack", originatingThreadId: "1706000000.000100" },
];
expect(findThreadIdBuggy(items)).toBeUndefined();
});
it("finds numeric thread ID", () => {
const items: FakeQueueItem[] = [
{ originatingChannel: "discord" },
{ originatingChannel: "discord", originatingThreadId: 99 },
];
expect(findThreadId(items)).toBe(99);
expect(findThreadIdBuggy(items)).toBe(99);
});
it("returns undefined when no items have threadId", () => {
const items: FakeQueueItem[] = [
{ originatingChannel: "slack" },
{ originatingChannel: "slack" },
];
expect(findThreadId(items)).toBeUndefined();
});
});
});

View File

@ -40,13 +40,13 @@ export function scheduleFollowupDrain(
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
if (!channel && !to && !accountId && threadId == null) {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
const threadKey = typeof threadId === "number" ? String(threadId) : "";
const threadKey = threadId != null ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
@ -61,7 +61,10 @@ export function scheduleFollowupDrain(
}
const items = queue.items.splice(0, queue.items.length);
const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const summary = buildQueueSummaryPrompt({
state: queue,
noun: "message",
});
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) break;
@ -72,7 +75,7 @@ export function scheduleFollowupDrain(
(i) => i.originatingAccountId,
)?.originatingAccountId;
const originatingThreadId = items.find(
(i) => typeof i.originatingThreadId === "number",
(i) => i.originatingThreadId != null,
)?.originatingThreadId;
const prompt = buildCollectPrompt({
@ -93,7 +96,10 @@ export function scheduleFollowupDrain(
continue;
}
const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const summaryPrompt = buildQueueSummaryPrompt({
state: queue,
noun: "message",
});
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) break;

View File

@ -141,7 +141,9 @@ export function printCronList(jobs: CronJob[], runtime = defaultRuntime) {
const now = Date.now();
for (const job of jobs) {
const idLabel = pad(job.id, CRON_ID_PAD);
// Gateway may return either 'id' or 'jobId' depending on API version
const id = (job as CronJob & { jobId?: string }).jobId ?? job.id ?? "";
const idLabel = pad(id, CRON_ID_PAD);
const nameLabel = pad(truncate(job.name, CRON_NAME_PAD), CRON_NAME_PAD);
const scheduleLabel = pad(
truncate(formatSchedule(job.schedule), CRON_SCHEDULE_PAD),

View File

@ -34,11 +34,36 @@ export type OpenAiBatchOutputLine = {
export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings";
const OPENAI_BATCH_COMPLETION_WINDOW = "24h";
const OPENAI_BATCH_MAX_REQUESTS = 50000;
const OPENAI_FETCH_TIMEOUT_MS = 30000; // 30 second timeout for individual fetch calls
function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string {
return openAi.baseUrl?.replace(/\/$/, "") ?? "";
}
async function fetchWithTimeout(
url: string,
options: RequestInit & { timeoutMs?: number },
): Promise<Response> {
const timeoutMs = options.timeoutMs ?? OPENAI_FETCH_TIMEOUT_MS;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, {
...options,
signal: controller.signal,
});
clearTimeout(timeoutId);
return response;
} catch (err) {
clearTimeout(timeoutId);
if (err instanceof Error && err.name === "AbortError") {
throw new Error(`fetch timeout after ${timeoutMs}ms: ${url}`);
}
throw err;
}
}
function getOpenAiHeaders(
openAi: OpenAiEmbeddingClient,
params: { json: boolean },
@ -79,10 +104,11 @@ async function submitOpenAiBatch(params: {
`memory-embeddings.${hashText(String(Date.now()))}.jsonl`,
);
const fileRes = await fetch(`${baseUrl}/files`, {
const fileRes = await fetchWithTimeout(`${baseUrl}/files`, {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: false }),
body: form,
timeoutMs: 60000, // 60s for file upload
});
if (!fileRes.ok) {
const text = await fileRes.text();
@ -95,7 +121,7 @@ async function submitOpenAiBatch(params: {
const batchRes = await retryAsync(
async () => {
const res = await fetch(`${baseUrl}/batches`, {
const res = await fetchWithTimeout(`${baseUrl}/batches`, {
method: "POST",
headers: getOpenAiHeaders(params.openAi, { json: true }),
body: JSON.stringify({
@ -137,7 +163,7 @@ async function fetchOpenAiBatchStatus(params: {
batchId: string;
}): Promise<OpenAiBatchStatus> {
const baseUrl = getOpenAiBaseUrl(params.openAi);
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
const res = await fetchWithTimeout(`${baseUrl}/batches/${params.batchId}`, {
headers: getOpenAiHeaders(params.openAi, { json: true }),
});
if (!res.ok) {
@ -152,8 +178,9 @@ async function fetchOpenAiFileContent(params: {
fileId: string;
}): Promise<string> {
const baseUrl = getOpenAiBaseUrl(params.openAi);
const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, {
const res = await fetchWithTimeout(`${baseUrl}/files/${params.fileId}/content`, {
headers: getOpenAiHeaders(params.openAi, { json: true }),
timeoutMs: 60000, // 60s for file download
});
if (!res.ok) {
const text = await res.text();
@ -208,10 +235,24 @@ async function waitForOpenAiBatch(params: {
while (true) {
const status =
current ??
(await fetchOpenAiBatchStatus({
openAi: params.openAi,
batchId: params.batchId,
}));
(await retryAsync(
async () =>
await fetchOpenAiBatchStatus({
openAi: params.openAi,
batchId: params.batchId,
}),
{
attempts: 3,
minDelayMs: 500,
maxDelayMs: 2000,
jitter: 0.2,
shouldRetry: (err) => {
// Retry on network errors and timeout errors
const message = err instanceof Error ? err.message : String(err);
return message.includes("timeout") || message.includes("fetch failed");
},
},
));
const state = status.status ?? "unknown";
if (state === "completed") {
if (!status.output_file_id) {