fix: prevent sub-agent announces from bypassing queued user messages
- Fix 2: Check followup queue depth before direct announce delivery - Fix 3: Drain followup queue in runReplyAgent finally block (prevents stuck messages on error paths) Co-authored-by: Zach Canepa <zcanepa19@gmail.com>
This commit is contained in:
parent
4583f88626
commit
5a5c969ee5
@ -9,7 +9,7 @@ import {
|
|||||||
resolveStorePath,
|
resolveStorePath,
|
||||||
} from "../config/sessions.js";
|
} from "../config/sessions.js";
|
||||||
import { normalizeMainKey } from "../routing/session-key.js";
|
import { normalizeMainKey } from "../routing/session-key.js";
|
||||||
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
import { resolveQueueSettings, getFollowupQueueDepth } from "../auto-reply/reply/queue.js";
|
||||||
import { callGateway } from "../gateway/call.js";
|
import { callGateway } from "../gateway/call.js";
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
import {
|
import {
|
||||||
@ -413,6 +413,38 @@ export async function runSubagentAnnounceFlow(params: {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fix 2: If followup queue has pending user messages, enqueue the announce
|
||||||
|
// instead of sending directly. This prevents the announce from jumping
|
||||||
|
// ahead of queued user messages in the per-session lane.
|
||||||
|
{
|
||||||
|
const cfg = loadConfig();
|
||||||
|
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
||||||
|
const followupDepth = getFollowupQueueDepth(canonicalKey);
|
||||||
|
if (followupDepth > 0) {
|
||||||
|
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
||||||
|
const queueSettings = resolveQueueSettings({
|
||||||
|
cfg,
|
||||||
|
channel: entry?.channel ?? entry?.lastChannel,
|
||||||
|
sessionEntry: entry,
|
||||||
|
});
|
||||||
|
const origin = resolveAnnounceOrigin(entry, requesterOrigin);
|
||||||
|
enqueueAnnounce({
|
||||||
|
key: canonicalKey,
|
||||||
|
item: {
|
||||||
|
prompt: triggerMessage,
|
||||||
|
summaryLine: taskLabel,
|
||||||
|
enqueuedAt: Date.now(),
|
||||||
|
sessionKey: canonicalKey,
|
||||||
|
origin,
|
||||||
|
},
|
||||||
|
settings: queueSettings,
|
||||||
|
send: sendAnnounce,
|
||||||
|
});
|
||||||
|
didAnnounce = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Send to main agent - it will respond in its own voice
|
// Send to main agent - it will respond in its own voice
|
||||||
let directOrigin = requesterOrigin;
|
let directOrigin = requesterOrigin;
|
||||||
if (!directOrigin) {
|
if (!directOrigin) {
|
||||||
|
|||||||
@ -34,7 +34,12 @@ import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.j
|
|||||||
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||||
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
|
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
|
||||||
import { createFollowupRunner } from "./followup-runner.js";
|
import { createFollowupRunner } from "./followup-runner.js";
|
||||||
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
|
import {
|
||||||
|
enqueueFollowupRun,
|
||||||
|
scheduleFollowupDrain,
|
||||||
|
type FollowupRun,
|
||||||
|
type QueueSettings,
|
||||||
|
} from "./queue.js";
|
||||||
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
|
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
|
||||||
import { persistSessionUsageUpdate } from "./session-usage.js";
|
import { persistSessionUsageUpdate } from "./session-usage.js";
|
||||||
import { incrementCompactionCount } from "./session-updates.js";
|
import { incrementCompactionCount } from "./session-updates.js";
|
||||||
@ -510,5 +515,9 @@ export async function runReplyAgent(params: {
|
|||||||
} finally {
|
} finally {
|
||||||
blockReplyPipeline?.stop();
|
blockReplyPipeline?.stop();
|
||||||
typing.markRunComplete();
|
typing.markRunComplete();
|
||||||
|
// Fix 3: Ensure queued followup messages are drained even on error paths.
|
||||||
|
// Without this, messages enqueued in FOLLOWUP_QUEUES can get stuck if an
|
||||||
|
// exception is thrown before any finalizeWithFollowup() call is reached.
|
||||||
|
scheduleFollowupDrain(queueKey, runFollowupTurn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user