From 5a5c969ee54f83364a34db89c1e00d0d40422a7b Mon Sep 17 00:00:00 2001 From: 1alyx Date: Fri, 30 Jan 2026 00:34:17 -0500 Subject: [PATCH 1/2] fix: prevent sub-agent announces from bypassing queued user messages - Fix 2: Check followup queue depth before direct announce delivery - Fix 3: Drain followup queue in runReplyAgent finally block (prevents stuck messages on error paths) Co-authored-by: Zach Canepa --- src/agents/subagent-announce.ts | 34 +++++++++++++++++++++++++++- src/auto-reply/reply/agent-runner.ts | 11 ++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 444726efc..3a0426097 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -9,7 +9,7 @@ import { resolveStorePath, } from "../config/sessions.js"; import { normalizeMainKey } from "../routing/session-key.js"; -import { resolveQueueSettings } from "../auto-reply/reply/queue.js"; +import { resolveQueueSettings, getFollowupQueueDepth } from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; import { defaultRuntime } from "../runtime.js"; import { @@ -413,6 +413,38 @@ export async function runSubagentAnnounceFlow(params: { return true; } + // Fix 2: If followup queue has pending user messages, enqueue the announce + // instead of sending directly. This prevents the announce from jumping + // ahead of queued user messages in the per-session lane. + { + const cfg = loadConfig(); + const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); + const followupDepth = getFollowupQueueDepth(canonicalKey); + if (followupDepth > 0) { + const { entry } = loadRequesterSessionEntry(params.requesterSessionKey); + const queueSettings = resolveQueueSettings({ + cfg, + channel: entry?.channel ?? entry?.lastChannel, + sessionEntry: entry, + }); + const origin = resolveAnnounceOrigin(entry, requesterOrigin); + enqueueAnnounce({ + key: canonicalKey, + item: { + prompt: triggerMessage, + summaryLine: taskLabel, + enqueuedAt: Date.now(), + sessionKey: canonicalKey, + origin, + }, + settings: queueSettings, + send: sendAnnounce, + }); + didAnnounce = true; + return true; + } + } + // Send to main agent - it will respond in its own voice let directOrigin = requesterOrigin; if (!directOrigin) { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 227e6f17e..f03fd4027 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -34,7 +34,12 @@ import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.j import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; -import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; +import { + enqueueFollowupRun, + scheduleFollowupDrain, + type FollowupRun, + type QueueSettings, +} from "./queue.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { persistSessionUsageUpdate } from "./session-usage.js"; import { incrementCompactionCount } from "./session-updates.js"; @@ -510,5 +515,9 @@ export async function runReplyAgent(params: { } finally { blockReplyPipeline?.stop(); typing.markRunComplete(); + // Fix 3: Ensure queued followup messages are drained even on error paths. + // Without this, messages enqueued in FOLLOWUP_QUEUES can get stuck if an + // exception is thrown before any finalizeWithFollowup() call is reached. + scheduleFollowupDrain(queueKey, runFollowupTurn); } } From a96bc734f05f8ea94abd9e2545d91271b99669a4 Mon Sep 17 00:00:00 2001 From: 1alyx Date: Fri, 30 Jan 2026 02:47:25 -0500 Subject: [PATCH 2/2] fix: eliminate direct callGateway bypass for sub-agent announces Fix v2: All announces now go through the announce queue unconditionally, removing the direct callGateway("agent") path that could race against user messages still in the dispatch pipeline. Changes: - subagent-announce.ts: Remove getFollowupQueueDepth conditional; always enqueue announces via enqueueAnnounce() instead of direct gateway call - subagent-announce-queue.ts: Add waitForFollowupQueueIdle() that polls FOLLOWUP_QUEUES before draining announces, ensuring user messages are always processed first. 30s timeout prevents deadlocks. The announce queue's 1000ms debounce buffer covers in-flight user messages (~50-300ms dispatch pipeline), and the followup queue yield covers competing drain loop races. --- src/agents/subagent-announce-queue.ts | 24 ++++++++ src/agents/subagent-announce.ts | 80 +++++++++------------------ 2 files changed, 50 insertions(+), 54 deletions(-) diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index d7510b3f5..228cc9fc8 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -1,4 +1,5 @@ import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js"; +import { FOLLOWUP_QUEUES } from "../auto-reply/reply/queue/state.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, @@ -13,6 +14,27 @@ import { waitForQueueDebounce, } from "../utils/queue-helpers.js"; +/** + * Fix 2v2: Wait for the followup queue (user messages) to finish draining + * before processing announce items. This ensures user messages are always + * handled before sub-agent announcements in the per-session lane. + * + * Uses polling (100ms interval) with a 30s timeout to avoid deadlocks. + */ +async function waitForFollowupQueueIdle(key: string, timeoutMs = 30_000): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const fq = FOLLOWUP_QUEUES.get(key); + if (!fq || (fq.items.length === 0 && !fq.draining && fq.droppedCount === 0)) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + defaultRuntime.error?.( + `announce drain: followup queue yield timeout for ${key} (${timeoutMs}ms)`, + ); +} + export type AnnounceQueueItem = { prompt: string; summaryLine?: string; @@ -89,6 +111,8 @@ function scheduleAnnounceDrain(key: string) { let forceIndividualCollect = false; while (queue.items.length > 0 || queue.droppedCount > 0) { await waitForQueueDebounce(queue); + // Fix 2v2: Yield to followup queue — user messages always go first. + await waitForFollowupQueueIdle(key); if (queue.mode === "collect") { if (forceIndividualCollect) { const next = queue.items.shift(); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 3a0426097..13fd26411 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -9,7 +9,7 @@ import { resolveStorePath, } from "../config/sessions.js"; import { normalizeMainKey } from "../routing/session-key.js"; -import { resolveQueueSettings, getFollowupQueueDepth } from "../auto-reply/reply/queue.js"; +import { resolveQueueSettings } from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; import { defaultRuntime } from "../runtime.js"; import { @@ -413,64 +413,36 @@ export async function runSubagentAnnounceFlow(params: { return true; } - // Fix 2: If followup queue has pending user messages, enqueue the announce - // instead of sending directly. This prevents the announce from jumping - // ahead of queued user messages in the per-session lane. + // Fix 2v2: Always enqueue announces — never send directly to the gateway. + // This eliminates race conditions where a direct callGateway("agent") + // bypasses queue ordering and gets processed before user messages that + // are still in the dispatch pipeline. The announce queue's debounce + // (default 1000ms) provides a timing buffer, and the drain's followup- + // queue yield check ensures user messages are always processed first. { const cfg = loadConfig(); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); - const followupDepth = getFollowupQueueDepth(canonicalKey); - if (followupDepth > 0) { - const { entry } = loadRequesterSessionEntry(params.requesterSessionKey); - const queueSettings = resolveQueueSettings({ - cfg, - channel: entry?.channel ?? entry?.lastChannel, - sessionEntry: entry, - }); - const origin = resolveAnnounceOrigin(entry, requesterOrigin); - enqueueAnnounce({ - key: canonicalKey, - item: { - prompt: triggerMessage, - summaryLine: taskLabel, - enqueuedAt: Date.now(), - sessionKey: canonicalKey, - origin, - }, - settings: queueSettings, - send: sendAnnounce, - }); - didAnnounce = true; - return true; - } - } - - // Send to main agent - it will respond in its own voice - let directOrigin = requesterOrigin; - if (!directOrigin) { const { entry } = loadRequesterSessionEntry(params.requesterSessionKey); - directOrigin = deliveryContextFromSession(entry); + const queueSettings = resolveQueueSettings({ + cfg, + channel: entry?.channel ?? entry?.lastChannel, + sessionEntry: entry, + }); + const origin = resolveAnnounceOrigin(entry, requesterOrigin); + enqueueAnnounce({ + key: canonicalKey, + item: { + prompt: triggerMessage, + summaryLine: taskLabel, + enqueuedAt: Date.now(), + sessionKey: canonicalKey, + origin, + }, + settings: queueSettings, + send: sendAnnounce, + }); + didAnnounce = true; } - await callGateway({ - method: "agent", - params: { - sessionKey: params.requesterSessionKey, - message: triggerMessage, - deliver: true, - channel: directOrigin?.channel, - accountId: directOrigin?.accountId, - to: directOrigin?.to, - threadId: - directOrigin?.threadId != null && directOrigin.threadId !== "" - ? String(directOrigin.threadId) - : undefined, - idempotencyKey: crypto.randomUUID(), - }, - expectFinal: true, - timeoutMs: 60_000, - }); - - didAnnounce = true; } catch (err) { defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`); // Best-effort follow-ups; ignore failures to avoid breaking the caller response.