Merge a96bc734f0 into 09be5d45d5
This commit is contained in:
commit
cea4430c13
@ -1,4 +1,5 @@
|
||||
import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js";
|
||||
import { FOLLOWUP_QUEUES } from "../auto-reply/reply/queue/state.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import {
|
||||
type DeliveryContext,
|
||||
@ -13,6 +14,27 @@ import {
|
||||
waitForQueueDebounce,
|
||||
} from "../utils/queue-helpers.js";
|
||||
|
||||
/**
|
||||
* Fix 2v2: Wait for the followup queue (user messages) to finish draining
|
||||
* before processing announce items. This ensures user messages are always
|
||||
* handled before sub-agent announcements in the per-session lane.
|
||||
*
|
||||
* Uses polling (100ms interval) with a 30s timeout to avoid deadlocks.
|
||||
*/
|
||||
async function waitForFollowupQueueIdle(key: string, timeoutMs = 30_000): Promise<void> {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const fq = FOLLOWUP_QUEUES.get(key);
|
||||
if (!fq || (fq.items.length === 0 && !fq.draining && fq.droppedCount === 0)) {
|
||||
return;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
defaultRuntime.error?.(
|
||||
`announce drain: followup queue yield timeout for ${key} (${timeoutMs}ms)`,
|
||||
);
|
||||
}
|
||||
|
||||
export type AnnounceQueueItem = {
|
||||
prompt: string;
|
||||
summaryLine?: string;
|
||||
@ -89,6 +111,8 @@ function scheduleAnnounceDrain(key: string) {
|
||||
let forceIndividualCollect = false;
|
||||
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
||||
await waitForQueueDebounce(queue);
|
||||
// Fix 2v2: Yield to followup queue — user messages always go first.
|
||||
await waitForFollowupQueueIdle(key);
|
||||
if (queue.mode === "collect") {
|
||||
if (forceIndividualCollect) {
|
||||
const next = queue.items.shift();
|
||||
|
||||
@ -413,32 +413,36 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Send to main agent - it will respond in its own voice
|
||||
let directOrigin = requesterOrigin;
|
||||
if (!directOrigin) {
|
||||
// Fix 2v2: Always enqueue announces — never send directly to the gateway.
|
||||
// This eliminates race conditions where a direct callGateway("agent")
|
||||
// bypasses queue ordering and gets processed before user messages that
|
||||
// are still in the dispatch pipeline. The announce queue's debounce
|
||||
// (default 1000ms) provides a timing buffer, and the drain's followup-
|
||||
// queue yield check ensures user messages are always processed first.
|
||||
{
|
||||
const cfg = loadConfig();
|
||||
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
||||
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
||||
directOrigin = deliveryContextFromSession(entry);
|
||||
const queueSettings = resolveQueueSettings({
|
||||
cfg,
|
||||
channel: entry?.channel ?? entry?.lastChannel,
|
||||
sessionEntry: entry,
|
||||
});
|
||||
const origin = resolveAnnounceOrigin(entry, requesterOrigin);
|
||||
enqueueAnnounce({
|
||||
key: canonicalKey,
|
||||
item: {
|
||||
prompt: triggerMessage,
|
||||
summaryLine: taskLabel,
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: canonicalKey,
|
||||
origin,
|
||||
},
|
||||
settings: queueSettings,
|
||||
send: sendAnnounce,
|
||||
});
|
||||
didAnnounce = true;
|
||||
}
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: params.requesterSessionKey,
|
||||
message: triggerMessage,
|
||||
deliver: true,
|
||||
channel: directOrigin?.channel,
|
||||
accountId: directOrigin?.accountId,
|
||||
to: directOrigin?.to,
|
||||
threadId:
|
||||
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
||||
? String(directOrigin.threadId)
|
||||
: undefined,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: 60_000,
|
||||
});
|
||||
|
||||
didAnnounce = true;
|
||||
} catch (err) {
|
||||
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
||||
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
||||
|
||||
@ -34,7 +34,12 @@ import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.j
|
||||
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
|
||||
import { createFollowupRunner } from "./followup-runner.js";
|
||||
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
|
||||
import {
|
||||
enqueueFollowupRun,
|
||||
scheduleFollowupDrain,
|
||||
type FollowupRun,
|
||||
type QueueSettings,
|
||||
} from "./queue.js";
|
||||
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
|
||||
import { persistSessionUsageUpdate } from "./session-usage.js";
|
||||
import { incrementCompactionCount } from "./session-updates.js";
|
||||
@ -510,5 +515,9 @@ export async function runReplyAgent(params: {
|
||||
} finally {
|
||||
blockReplyPipeline?.stop();
|
||||
typing.markRunComplete();
|
||||
// Fix 3: Ensure queued followup messages are drained even on error paths.
|
||||
// Without this, messages enqueued in FOLLOWUP_QUEUES can get stuck if an
|
||||
// exception is thrown before any finalizeWithFollowup() call is reached.
|
||||
scheduleFollowupDrain(queueKey, runFollowupTurn);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user