fix: eliminate direct callGateway bypass for sub-agent announces
Fix v2: All announces now go through the announce queue unconditionally,
removing the direct callGateway("agent") path that could race against
user messages still in the dispatch pipeline.
Changes:
- subagent-announce.ts: Remove getFollowupQueueDepth conditional; always
enqueue announces via enqueueAnnounce() instead of direct gateway call
- subagent-announce-queue.ts: Add waitForFollowupQueueIdle() that polls
FOLLOWUP_QUEUES before draining announces, ensuring user messages are
always processed first. 30s timeout prevents deadlocks.
The announce queue's 1000ms debounce buffer covers in-flight user messages
(~50-300ms dispatch pipeline), and the followup queue yield covers
competing drain loop races.
This commit is contained in:
parent
5a5c969ee5
commit
a96bc734f0
@ -1,4 +1,5 @@
|
|||||||
import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js";
|
import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js";
|
||||||
|
import { FOLLOWUP_QUEUES } from "../auto-reply/reply/queue/state.js";
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
import {
|
import {
|
||||||
type DeliveryContext,
|
type DeliveryContext,
|
||||||
@ -13,6 +14,27 @@ import {
|
|||||||
waitForQueueDebounce,
|
waitForQueueDebounce,
|
||||||
} from "../utils/queue-helpers.js";
|
} from "../utils/queue-helpers.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fix 2v2: Wait for the followup queue (user messages) to finish draining
|
||||||
|
* before processing announce items. This ensures user messages are always
|
||||||
|
* handled before sub-agent announcements in the per-session lane.
|
||||||
|
*
|
||||||
|
* Uses polling (100ms interval) with a 30s timeout to avoid deadlocks.
|
||||||
|
*/
|
||||||
|
async function waitForFollowupQueueIdle(key: string, timeoutMs = 30_000): Promise<void> {
|
||||||
|
const deadline = Date.now() + timeoutMs;
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
const fq = FOLLOWUP_QUEUES.get(key);
|
||||||
|
if (!fq || (fq.items.length === 0 && !fq.draining && fq.droppedCount === 0)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
defaultRuntime.error?.(
|
||||||
|
`announce drain: followup queue yield timeout for ${key} (${timeoutMs}ms)`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
export type AnnounceQueueItem = {
|
export type AnnounceQueueItem = {
|
||||||
prompt: string;
|
prompt: string;
|
||||||
summaryLine?: string;
|
summaryLine?: string;
|
||||||
@ -89,6 +111,8 @@ function scheduleAnnounceDrain(key: string) {
|
|||||||
let forceIndividualCollect = false;
|
let forceIndividualCollect = false;
|
||||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||||
await waitForQueueDebounce(queue);
|
await waitForQueueDebounce(queue);
|
||||||
|
// Fix 2v2: Yield to followup queue — user messages always go first.
|
||||||
|
await waitForFollowupQueueIdle(key);
|
||||||
if (queue.mode === "collect") {
|
if (queue.mode === "collect") {
|
||||||
if (forceIndividualCollect) {
|
if (forceIndividualCollect) {
|
||||||
const next = queue.items.shift();
|
const next = queue.items.shift();
|
||||||
|
|||||||
@ -9,7 +9,7 @@ import {
|
|||||||
resolveStorePath,
|
resolveStorePath,
|
||||||
} from "../config/sessions.js";
|
} from "../config/sessions.js";
|
||||||
import { normalizeMainKey } from "../routing/session-key.js";
|
import { normalizeMainKey } from "../routing/session-key.js";
|
||||||
import { resolveQueueSettings, getFollowupQueueDepth } from "../auto-reply/reply/queue.js";
|
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
||||||
import { callGateway } from "../gateway/call.js";
|
import { callGateway } from "../gateway/call.js";
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
import {
|
import {
|
||||||
@ -413,64 +413,36 @@ export async function runSubagentAnnounceFlow(params: {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fix 2: If followup queue has pending user messages, enqueue the announce
|
// Fix 2v2: Always enqueue announces — never send directly to the gateway.
|
||||||
// instead of sending directly. This prevents the announce from jumping
|
// This eliminates race conditions where a direct callGateway("agent")
|
||||||
// ahead of queued user messages in the per-session lane.
|
// bypasses queue ordering and gets processed before user messages that
|
||||||
|
// are still in the dispatch pipeline. The announce queue's debounce
|
||||||
|
// (default 1000ms) provides a timing buffer, and the drain's followup-
|
||||||
|
// queue yield check ensures user messages are always processed first.
|
||||||
{
|
{
|
||||||
const cfg = loadConfig();
|
const cfg = loadConfig();
|
||||||
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
||||||
const followupDepth = getFollowupQueueDepth(canonicalKey);
|
|
||||||
if (followupDepth > 0) {
|
|
||||||
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
|
||||||
const queueSettings = resolveQueueSettings({
|
|
||||||
cfg,
|
|
||||||
channel: entry?.channel ?? entry?.lastChannel,
|
|
||||||
sessionEntry: entry,
|
|
||||||
});
|
|
||||||
const origin = resolveAnnounceOrigin(entry, requesterOrigin);
|
|
||||||
enqueueAnnounce({
|
|
||||||
key: canonicalKey,
|
|
||||||
item: {
|
|
||||||
prompt: triggerMessage,
|
|
||||||
summaryLine: taskLabel,
|
|
||||||
enqueuedAt: Date.now(),
|
|
||||||
sessionKey: canonicalKey,
|
|
||||||
origin,
|
|
||||||
},
|
|
||||||
settings: queueSettings,
|
|
||||||
send: sendAnnounce,
|
|
||||||
});
|
|
||||||
didAnnounce = true;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send to main agent - it will respond in its own voice
|
|
||||||
let directOrigin = requesterOrigin;
|
|
||||||
if (!directOrigin) {
|
|
||||||
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
||||||
directOrigin = deliveryContextFromSession(entry);
|
const queueSettings = resolveQueueSettings({
|
||||||
|
cfg,
|
||||||
|
channel: entry?.channel ?? entry?.lastChannel,
|
||||||
|
sessionEntry: entry,
|
||||||
|
});
|
||||||
|
const origin = resolveAnnounceOrigin(entry, requesterOrigin);
|
||||||
|
enqueueAnnounce({
|
||||||
|
key: canonicalKey,
|
||||||
|
item: {
|
||||||
|
prompt: triggerMessage,
|
||||||
|
summaryLine: taskLabel,
|
||||||
|
enqueuedAt: Date.now(),
|
||||||
|
sessionKey: canonicalKey,
|
||||||
|
origin,
|
||||||
|
},
|
||||||
|
settings: queueSettings,
|
||||||
|
send: sendAnnounce,
|
||||||
|
});
|
||||||
|
didAnnounce = true;
|
||||||
}
|
}
|
||||||
await callGateway({
|
|
||||||
method: "agent",
|
|
||||||
params: {
|
|
||||||
sessionKey: params.requesterSessionKey,
|
|
||||||
message: triggerMessage,
|
|
||||||
deliver: true,
|
|
||||||
channel: directOrigin?.channel,
|
|
||||||
accountId: directOrigin?.accountId,
|
|
||||||
to: directOrigin?.to,
|
|
||||||
threadId:
|
|
||||||
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
|
||||||
? String(directOrigin.threadId)
|
|
||||||
: undefined,
|
|
||||||
idempotencyKey: crypto.randomUUID(),
|
|
||||||
},
|
|
||||||
expectFinal: true,
|
|
||||||
timeoutMs: 60_000,
|
|
||||||
});
|
|
||||||
|
|
||||||
didAnnounce = true;
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
||||||
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user