fix: preserve pending tasks when subagent completes

Fixes #3031

When a subagent completes and announces its result back to the main
agent, the main agent's pending tasks were being lost due to a race
condition between the followup queue drain and the announce flow.

Changes:
- Add persistence layer for followup queues (similar to subagent runs)
- Implement 30-second grace period before deleting empty queues
- Persist queue state after enqueue/dequeue operations
- Track emptyAt timestamp to prevent premature deletion
- Add periodic cleanup for expired empty queues

This ensures that when a subagent completes and triggers a new main
agent invocation, any pending tasks in the followup queue are
preserved and restored from disk.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
sid1943 2026-01-27 19:31:33 -05:00
parent 0b1c8db0ca
commit 82efb719ab
4 changed files with 144 additions and 5 deletions

View File

@ -6,9 +6,31 @@ import {
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import { FOLLOWUP_QUEUES, persistFollowupQueues } from "./state.js";
import type { FollowupRun } from "./types.js";
// Grace period before deleting empty queues (to handle subagent announce race conditions)
const EMPTY_QUEUE_GRACE_PERIOD_MS = 30_000;
// Periodic cleanup of expired empty queues
let cleanupTimer: NodeJS.Timeout | null = null;
function scheduleQueueCleanup() {
if (cleanupTimer) return;
cleanupTimer = setInterval(() => {
const now = Date.now();
for (const [key, queue] of FOLLOWUP_QUEUES.entries()) {
if (queue.items.length === 0 && queue.droppedCount === 0 && queue.emptyAt) {
const emptyDuration = now - queue.emptyAt;
if (emptyDuration >= EMPTY_QUEUE_GRACE_PERIOD_MS) {
FOLLOWUP_QUEUES.delete(key);
}
}
}
persistFollowupQueues();
}, EMPTY_QUEUE_GRACE_PERIOD_MS);
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@ -16,6 +38,7 @@ export function scheduleFollowupDrain(
const queue = FOLLOWUP_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
scheduleQueueCleanup();
void (async () => {
try {
let forceIndividualCollect = false;
@ -113,11 +136,27 @@ export function scheduleFollowupDrain(
defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
const isEmpty = queue.items.length === 0 && queue.droppedCount === 0;
if (isEmpty) {
// Mark when queue became empty
if (!queue.emptyAt) {
queue.emptyAt = Date.now();
}
// Only delete if it's been empty for the grace period
// This prevents race conditions with subagent announces
const emptyDuration = Date.now() - queue.emptyAt;
if (emptyDuration >= EMPTY_QUEUE_GRACE_PERIOD_MS) {
FOLLOWUP_QUEUES.delete(key);
}
} else {
// Queue has items, clear emptyAt and continue draining
queue.emptyAt = undefined;
scheduleFollowupDrain(key, runFollowup);
}
persistFollowupQueues();
}
})();
}

View File

@ -1,5 +1,5 @@
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js";
import { FOLLOWUP_QUEUES, getFollowupQueue, persistFollowupQueues } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
function isRunAlreadyQueued(
@ -39,14 +39,20 @@ export function enqueueFollowupRun(
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
// Clear emptyAt since we're adding an item
queue.emptyAt = undefined;
const shouldEnqueue = applyQueueDropPolicy({
queue,
summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(),
});
if (!shouldEnqueue) return false;
if (!shouldEnqueue) {
persistFollowupQueues();
return false;
}
queue.items.push(run);
persistFollowupQueues();
return true;
}

View File

@ -0,0 +1,57 @@
import path from "node:path";
import { STATE_DIR } from "../../../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../../../infra/json-file.js";
import type { FollowupQueueState } from "./state.js";
export type PersistedFollowupQueueVersion = 1;
type PersistedFollowupQueueRegistry = {
version: 1;
queues: Record<string, PersistedFollowupQueueState>;
};
type PersistedFollowupQueueState = FollowupQueueState;
const REGISTRY_VERSION = 1 as const;
export function resolveFollowupQueueRegistryPath(): string {
return path.join(STATE_DIR, "followup-queues", "queues.json");
}
export function loadFollowupQueuesFromDisk(): Map<string, FollowupQueueState> {
const pathname = resolveFollowupQueueRegistryPath();
const raw = loadJsonFile(pathname);
if (!raw || typeof raw !== "object") return new Map();
const record = raw as Partial<PersistedFollowupQueueRegistry>;
if (record.version !== 1) return new Map();
const queuesRaw = record.queues;
if (!queuesRaw || typeof queuesRaw !== "object") return new Map();
const out = new Map<string, FollowupQueueState>();
for (const [key, entry] of Object.entries(queuesRaw)) {
if (!entry || typeof entry !== "object") continue;
// Reset draining state on restore - will be restarted if items exist
const restored: FollowupQueueState = {
...entry,
draining: false,
};
out.set(key, restored);
}
return out;
}
export function saveFollowupQueuesToDisk(queues: Map<string, FollowupQueueState>) {
const pathname = resolveFollowupQueueRegistryPath();
const serialized: Record<string, PersistedFollowupQueueState> = {};
for (const [key, entry] of queues.entries()) {
// Only persist queues that have items or have been used recently
if (entry.items.length > 0 || entry.droppedCount > 0 || entry.lastEnqueuedAt > 0) {
serialized[key] = entry;
}
}
const out: PersistedFollowupQueueRegistry = {
version: REGISTRY_VERSION,
queues: serialized,
};
saveJsonFile(pathname, out);
}

View File

@ -1,4 +1,5 @@
import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js";
import { loadFollowupQueuesFromDisk, saveFollowupQueuesToDisk } from "./state.store.js";
export type FollowupQueueState = {
items: FollowupRun[];
@ -11,6 +12,8 @@ export type FollowupQueueState = {
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
/** Timestamp when queue became empty (used for grace period before deletion) */
emptyAt?: number;
};
export const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
@ -19,7 +22,39 @@ export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
export const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
// Track if we've restored from disk
let restoreAttempted = false;
export function persistFollowupQueues() {
try {
saveFollowupQueuesToDisk(FOLLOWUP_QUEUES);
} catch {
// ignore persistence failures
}
}
function restoreFollowupQueuesOnce() {
if (restoreAttempted) return;
restoreAttempted = true;
try {
const restored = loadFollowupQueuesFromDisk();
if (restored.size === 0) return;
for (const [key, entry] of restored.entries()) {
if (!key || !entry) continue;
// Keep any newer in-memory entries
if (!FOLLOWUP_QUEUES.has(key)) {
FOLLOWUP_QUEUES.set(key, entry);
}
}
} catch {
// ignore restore failures
}
}
export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState {
// Restore queues from disk on first access
restoreFollowupQueuesOnce();
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
@ -53,6 +88,7 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
summaryLines: [],
};
FOLLOWUP_QUEUES.set(key, created);
persistFollowupQueues();
return created;
}
@ -68,5 +104,6 @@ export function clearFollowupQueue(key: string): number {
queue.lastRun = undefined;
queue.lastEnqueuedAt = 0;
FOLLOWUP_QUEUES.delete(cleaned);
persistFollowupQueues();
return cleared;
}