diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index d7510b3f5..228cc9fc8 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -1,4 +1,5 @@ import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js"; +import { FOLLOWUP_QUEUES } from "../auto-reply/reply/queue/state.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, @@ -13,6 +14,27 @@ import { waitForQueueDebounce, } from "../utils/queue-helpers.js"; +/** + * Fix 2v2: Wait for the followup queue (user messages) to finish draining + * before processing announce items. This ensures user messages are always + * handled before sub-agent announcements in the per-session lane. + * + * Uses polling (100ms interval) with a 30s timeout to avoid deadlocks. + */ +async function waitForFollowupQueueIdle(key: string, timeoutMs = 30_000): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const fq = FOLLOWUP_QUEUES.get(key); + if (!fq || (fq.items.length === 0 && !fq.draining && fq.droppedCount === 0)) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + defaultRuntime.error?.( + `announce drain: followup queue yield timeout for ${key} (${timeoutMs}ms)`, + ); +} + export type AnnounceQueueItem = { prompt: string; summaryLine?: string; @@ -89,6 +111,8 @@ function scheduleAnnounceDrain(key: string) { let forceIndividualCollect = false; while (queue.items.length > 0 || queue.droppedCount > 0) { await waitForQueueDebounce(queue); + // Fix 2v2: Yield to followup queue — user messages always go first. + await waitForFollowupQueueIdle(key); if (queue.mode === "collect") { if (forceIndividualCollect) { const next = queue.items.shift(); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 444726efc..13fd26411 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -413,32 +413,36 @@ export async function runSubagentAnnounceFlow(params: { return true; } - // Send to main agent - it will respond in its own voice - let directOrigin = requesterOrigin; - if (!directOrigin) { + // Fix 2v2: Always enqueue announces — never send directly to the gateway. + // This eliminates race conditions where a direct callGateway("agent") + // bypasses queue ordering and gets processed before user messages that + // are still in the dispatch pipeline. The announce queue's debounce + // (default 1000ms) provides a timing buffer, and the drain's followup- + // queue yield check ensures user messages are always processed first. + { + const cfg = loadConfig(); + const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); const { entry } = loadRequesterSessionEntry(params.requesterSessionKey); - directOrigin = deliveryContextFromSession(entry); + const queueSettings = resolveQueueSettings({ + cfg, + channel: entry?.channel ?? entry?.lastChannel, + sessionEntry: entry, + }); + const origin = resolveAnnounceOrigin(entry, requesterOrigin); + enqueueAnnounce({ + key: canonicalKey, + item: { + prompt: triggerMessage, + summaryLine: taskLabel, + enqueuedAt: Date.now(), + sessionKey: canonicalKey, + origin, + }, + settings: queueSettings, + send: sendAnnounce, + }); + didAnnounce = true; } - await callGateway({ - method: "agent", - params: { - sessionKey: params.requesterSessionKey, - message: triggerMessage, - deliver: true, - channel: directOrigin?.channel, - accountId: directOrigin?.accountId, - to: directOrigin?.to, - threadId: - directOrigin?.threadId != null && directOrigin.threadId !== "" - ? String(directOrigin.threadId) - : undefined, - idempotencyKey: crypto.randomUUID(), - }, - expectFinal: true, - timeoutMs: 60_000, - }); - - didAnnounce = true; } catch (err) { defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`); // Best-effort follow-ups; ignore failures to avoid breaking the caller response. diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 227e6f17e..f03fd4027 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -34,7 +34,12 @@ import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.j import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; -import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; +import { + enqueueFollowupRun, + scheduleFollowupDrain, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { persistSessionUsageUpdate } from "./session-usage.js"; import { incrementCompactionCount } from "./session-updates.js"; @@ -510,5 +515,9 @@ export async function runReplyAgent(params: { } finally { blockReplyPipeline?.stop(); typing.markRunComplete(); + // Fix 3: Ensure queued followup messages are drained even on error paths. + // Without this, messages enqueued in FOLLOWUP_QUEUES can get stuck if an + // exception is thrown before any finalizeWithFollowup() call is reached. + scheduleFollowupDrain(queueKey, runFollowupTurn); } }