From 82efb719ab530dda29116a7acd504a4f12a0f2fb Mon Sep 17 00:00:00 2001 From: sid1943 Date: Tue, 27 Jan 2026 19:31:33 -0500 Subject: [PATCH] fix: preserve pending tasks when subagent completes Fixes #3031 When a subagent completes and announces its result back to the main agent, the main agent's pending tasks were being lost due to a race condition between the followup queue drain and the announce flow. Changes: - Add persistence layer for followup queues (similar to subagent runs) - Implement 30-second grace period before deleting empty queues - Persist queue state after enqueue/dequeue operations - Track emptyAt timestamp to prevent premature deletion - Add periodic cleanup for expired empty queues This ensures that when a subagent completes and triggers a new main agent invocation, any pending tasks in the followup queue are preserved and restored from disk. Co-Authored-By: Claude Sonnet 4.5 --- src/auto-reply/reply/queue/drain.ts | 45 ++++++++++++++++-- src/auto-reply/reply/queue/enqueue.ts | 10 +++- src/auto-reply/reply/queue/state.store.ts | 57 +++++++++++++++++++++++ src/auto-reply/reply/queue/state.ts | 37 +++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 src/auto-reply/reply/queue/state.store.ts diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 01361a1ec..cccb8d14d 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -6,9 +6,31 @@ import { waitForQueueDebounce, } from "../../../utils/queue-helpers.js"; import { isRoutableChannel } from "../route-reply.js"; -import { FOLLOWUP_QUEUES } from "./state.js"; +import { FOLLOWUP_QUEUES, persistFollowupQueues } from "./state.js"; import type { FollowupRun } from "./types.js"; +// Grace period before deleting empty queues (to handle subagent announce race conditions) +const EMPTY_QUEUE_GRACE_PERIOD_MS = 30_000; + +// Periodic cleanup of expired empty queues +let cleanupTimer: NodeJS.Timeout | null = null; + +function scheduleQueueCleanup() { + if (cleanupTimer) return; + cleanupTimer = setInterval(() => { + const now = Date.now(); + for (const [key, queue] of FOLLOWUP_QUEUES.entries()) { + if (queue.items.length === 0 && queue.droppedCount === 0 && queue.emptyAt) { + const emptyDuration = now - queue.emptyAt; + if (emptyDuration >= EMPTY_QUEUE_GRACE_PERIOD_MS) { + FOLLOWUP_QUEUES.delete(key); + } + } + } + persistFollowupQueues(); + }, EMPTY_QUEUE_GRACE_PERIOD_MS); +} + export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -16,6 +38,7 @@ export function scheduleFollowupDrain( const queue = FOLLOWUP_QUEUES.get(key); if (!queue || queue.draining) return; queue.draining = true; + scheduleQueueCleanup(); void (async () => { try { let forceIndividualCollect = false; @@ -113,11 +136,27 @@ export function scheduleFollowupDrain( defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`); } finally { queue.draining = false; - if (queue.items.length === 0 && queue.droppedCount === 0) { - FOLLOWUP_QUEUES.delete(key); + const isEmpty = queue.items.length === 0 && queue.droppedCount === 0; + + if (isEmpty) { + // Mark when queue became empty + if (!queue.emptyAt) { + queue.emptyAt = Date.now(); + } + + // Only delete if it's been empty for the grace period + // This prevents race conditions with subagent announces + const emptyDuration = Date.now() - queue.emptyAt; + if (emptyDuration >= EMPTY_QUEUE_GRACE_PERIOD_MS) { + FOLLOWUP_QUEUES.delete(key); + } } else { + // Queue has items, clear emptyAt and continue draining + queue.emptyAt = undefined; scheduleFollowupDrain(key, runFollowup); } + + persistFollowupQueues(); } })(); } diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 7b242adbc..498b75fb6 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,5 +1,5 @@ import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; -import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js"; +import { FOLLOWUP_QUEUES, getFollowupQueue, persistFollowupQueues } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; function isRunAlreadyQueued( @@ -39,14 +39,20 @@ export function enqueueFollowupRun( queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; + // Clear emptyAt since we're adding an item + queue.emptyAt = undefined; const shouldEnqueue = applyQueueDropPolicy({ queue, summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(), }); - if (!shouldEnqueue) return false; + if (!shouldEnqueue) { + persistFollowupQueues(); + return false; + } queue.items.push(run); + persistFollowupQueues(); return true; } diff --git a/src/auto-reply/reply/queue/state.store.ts b/src/auto-reply/reply/queue/state.store.ts new file mode 100644 index 000000000..c627af01f --- /dev/null +++ b/src/auto-reply/reply/queue/state.store.ts @@ -0,0 +1,57 @@ +import path from "node:path"; + +import { STATE_DIR } from "../../../config/paths.js"; +import { loadJsonFile, saveJsonFile } from "../../../infra/json-file.js"; +import type { FollowupQueueState } from "./state.js"; + +export type PersistedFollowupQueueVersion = 1; + +type PersistedFollowupQueueRegistry = { + version: 1; + queues: Record; +}; + +type PersistedFollowupQueueState = FollowupQueueState; + +const REGISTRY_VERSION = 1 as const; + +export function resolveFollowupQueueRegistryPath(): string { + return path.join(STATE_DIR, "followup-queues", "queues.json"); +} + +export function loadFollowupQueuesFromDisk(): Map { + const pathname = resolveFollowupQueueRegistryPath(); + const raw = loadJsonFile(pathname); + if (!raw || typeof raw !== "object") return new Map(); + const record = raw as Partial; + if (record.version !== 1) return new Map(); + const queuesRaw = record.queues; + if (!queuesRaw || typeof queuesRaw !== "object") return new Map(); + const out = new Map(); + for (const [key, entry] of Object.entries(queuesRaw)) { + if (!entry || typeof entry !== "object") continue; + // Reset draining state on restore - will be restarted if items exist + const restored: FollowupQueueState = { + ...entry, + draining: false, + }; + out.set(key, restored); + } + return out; +} + +export function saveFollowupQueuesToDisk(queues: Map) { + const pathname = resolveFollowupQueueRegistryPath(); + const serialized: Record = {}; + for (const [key, entry] of queues.entries()) { + // Only persist queues that have items or have been used recently + if (entry.items.length > 0 || entry.droppedCount > 0 || entry.lastEnqueuedAt > 0) { + serialized[key] = entry; + } + } + const out: PersistedFollowupQueueRegistry = { + version: REGISTRY_VERSION, + queues: serialized, + }; + saveJsonFile(pathname, out); +} diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 0357ff8c8..082f00167 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -1,4 +1,5 @@ import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js"; +import { loadFollowupQueuesFromDisk, saveFollowupQueuesToDisk } from "./state.store.js"; export type FollowupQueueState = { items: FollowupRun[]; @@ -11,6 +12,8 @@ export type FollowupQueueState = { droppedCount: number; summaryLines: string[]; lastRun?: FollowupRun["run"]; + /** Timestamp when queue became empty (used for grace period before deletion) */ + emptyAt?: number; }; export const DEFAULT_QUEUE_DEBOUNCE_MS = 1000; @@ -19,7 +22,39 @@ export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize"; export const FOLLOWUP_QUEUES = new Map(); +// Track if we've restored from disk +let restoreAttempted = false; + +export function persistFollowupQueues() { + try { + saveFollowupQueuesToDisk(FOLLOWUP_QUEUES); + } catch { + // ignore persistence failures + } +} + +function restoreFollowupQueuesOnce() { + if (restoreAttempted) return; + restoreAttempted = true; + try { + const restored = loadFollowupQueuesFromDisk(); + if (restored.size === 0) return; + for (const [key, entry] of restored.entries()) { + if (!key || !entry) continue; + // Keep any newer in-memory entries + if (!FOLLOWUP_QUEUES.has(key)) { + FOLLOWUP_QUEUES.set(key, entry); + } + } + } catch { + // ignore restore failures + } +} + export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState { + // Restore queues from disk on first access + restoreFollowupQueuesOnce(); + const existing = FOLLOWUP_QUEUES.get(key); if (existing) { existing.mode = settings.mode; @@ -53,6 +88,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup summaryLines: [], }; FOLLOWUP_QUEUES.set(key, created); + persistFollowupQueues(); return created; } @@ -68,5 +104,6 @@ export function clearFollowupQueue(key: string): number { queue.lastRun = undefined; queue.lastEnqueuedAt = 0; FOLLOWUP_QUEUES.delete(cleaned); + persistFollowupQueues(); return cleared; }