## Problem When messages arrived while the agent was busy processing a previous message, the same message could be enqueued multiple times into the followup queue. This happened because Discord's event system can emit the same message multiple times (e.g., during reconnects or due to slow listener processing), and the followup queue had no deduplication logic. This caused the bot to respond to the same user message 2-4+ times. ## Solution Add simple exact-match deduplication in `enqueueFollowupRun()`: if a prompt is already in the queue, skip adding it again. Extracted into a small `isPromptAlreadyQueued()` helper for clarity. ## Testing - Added test cases for deduplication (same prompt rejected, different accepted) - Manually verified on Discord: single response per message even when multiple events fire during slow agent processing
615 lines
18 KiB
TypeScript
615 lines
18 KiB
TypeScript
import type { SkillSnapshot } from "../../agents/skills.js";
|
|
import { parseDurationMs } from "../../cli/parse-duration.js";
|
|
import type { ClawdbotConfig } from "../../config/config.js";
|
|
import type { SessionEntry } from "../../config/sessions.js";
|
|
import { defaultRuntime } from "../../runtime.js";
|
|
import type { OriginatingChannelType } from "../templating.js";
|
|
import type {
|
|
ElevatedLevel,
|
|
ReasoningLevel,
|
|
ThinkLevel,
|
|
VerboseLevel,
|
|
} from "./directives.js";
|
|
import { isRoutableChannel } from "./route-reply.js";
|
|
export type QueueMode =
|
|
| "steer"
|
|
| "followup"
|
|
| "collect"
|
|
| "steer-backlog"
|
|
| "interrupt"
|
|
| "queue";
|
|
export type QueueDropPolicy = "old" | "new" | "summarize";
|
|
export type QueueSettings = {
|
|
mode: QueueMode;
|
|
debounceMs?: number;
|
|
cap?: number;
|
|
dropPolicy?: QueueDropPolicy;
|
|
};
|
|
export type FollowupRun = {
|
|
prompt: string;
|
|
summaryLine?: string;
|
|
enqueuedAt: number;
|
|
/**
|
|
* Originating channel for reply routing.
|
|
* When set, replies should be routed back to this provider
|
|
* instead of using the session's lastChannel.
|
|
*/
|
|
originatingChannel?: OriginatingChannelType;
|
|
/**
|
|
* Originating destination for reply routing.
|
|
* The chat/channel/user ID where the reply should be sent.
|
|
*/
|
|
originatingTo?: string;
|
|
/** Provider account id (multi-account). */
|
|
originatingAccountId?: string;
|
|
/** Telegram forum topic thread id. */
|
|
originatingThreadId?: number;
|
|
run: {
|
|
agentId: string;
|
|
agentDir: string;
|
|
sessionId: string;
|
|
sessionKey?: string;
|
|
messageProvider?: string;
|
|
agentAccountId?: string;
|
|
sessionFile: string;
|
|
workspaceDir: string;
|
|
config: ClawdbotConfig;
|
|
skillsSnapshot?: SkillSnapshot;
|
|
provider: string;
|
|
model: string;
|
|
authProfileId?: string;
|
|
thinkLevel?: ThinkLevel;
|
|
verboseLevel?: VerboseLevel;
|
|
reasoningLevel?: ReasoningLevel;
|
|
elevatedLevel?: ElevatedLevel;
|
|
bashElevated?: {
|
|
enabled: boolean;
|
|
allowed: boolean;
|
|
defaultLevel: ElevatedLevel;
|
|
};
|
|
timeoutMs: number;
|
|
blockReplyBreak: "text_end" | "message_end";
|
|
ownerNumbers?: string[];
|
|
extraSystemPrompt?: string;
|
|
enforceFinalTag?: boolean;
|
|
};
|
|
};
|
|
type FollowupQueueState = {
|
|
items: FollowupRun[];
|
|
draining: boolean;
|
|
lastEnqueuedAt: number;
|
|
mode: QueueMode;
|
|
debounceMs: number;
|
|
cap: number;
|
|
dropPolicy: QueueDropPolicy;
|
|
droppedCount: number;
|
|
summaryLines: string[];
|
|
lastRun?: FollowupRun["run"];
|
|
};
|
|
const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
|
|
const DEFAULT_QUEUE_CAP = 20;
|
|
const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
|
|
const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
|
|
function normalizeQueueMode(raw?: string): QueueMode | undefined {
|
|
if (!raw) return undefined;
|
|
const cleaned = raw.trim().toLowerCase();
|
|
if (cleaned === "queue" || cleaned === "queued") return "steer";
|
|
if (
|
|
cleaned === "interrupt" ||
|
|
cleaned === "interrupts" ||
|
|
cleaned === "abort"
|
|
)
|
|
return "interrupt";
|
|
if (cleaned === "steer" || cleaned === "steering") return "steer";
|
|
if (
|
|
cleaned === "followup" ||
|
|
cleaned === "follow-ups" ||
|
|
cleaned === "followups"
|
|
)
|
|
return "followup";
|
|
if (cleaned === "collect" || cleaned === "coalesce") return "collect";
|
|
if (
|
|
cleaned === "steer+backlog" ||
|
|
cleaned === "steer-backlog" ||
|
|
cleaned === "steer_backlog"
|
|
)
|
|
return "steer-backlog";
|
|
return undefined;
|
|
}
|
|
function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined {
|
|
if (!raw) return undefined;
|
|
const cleaned = raw.trim().toLowerCase();
|
|
if (cleaned === "old" || cleaned === "oldest") return "old";
|
|
if (cleaned === "new" || cleaned === "newest") return "new";
|
|
if (cleaned === "summarize" || cleaned === "summary") return "summarize";
|
|
return undefined;
|
|
}
|
|
function parseQueueDebounce(raw?: string): number | undefined {
|
|
if (!raw) return undefined;
|
|
try {
|
|
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
|
|
if (!parsed || parsed < 0) return undefined;
|
|
return Math.round(parsed);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
function parseQueueCap(raw?: string): number | undefined {
|
|
if (!raw) return undefined;
|
|
const num = Number(raw);
|
|
if (!Number.isFinite(num)) return undefined;
|
|
const cap = Math.floor(num);
|
|
if (cap < 1) return undefined;
|
|
return cap;
|
|
}
|
|
function parseQueueDirectiveArgs(raw: string): {
|
|
consumed: number;
|
|
queueMode?: QueueMode;
|
|
queueReset: boolean;
|
|
rawMode?: string;
|
|
debounceMs?: number;
|
|
cap?: number;
|
|
dropPolicy?: QueueDropPolicy;
|
|
rawDebounce?: string;
|
|
rawCap?: string;
|
|
rawDrop?: string;
|
|
hasOptions: boolean;
|
|
} {
|
|
let i = 0;
|
|
const len = raw.length;
|
|
while (i < len && /\s/.test(raw[i])) i += 1;
|
|
if (raw[i] === ":") {
|
|
i += 1;
|
|
while (i < len && /\s/.test(raw[i])) i += 1;
|
|
}
|
|
let consumed = i;
|
|
let queueMode: QueueMode | undefined;
|
|
let queueReset = false;
|
|
let rawMode: string | undefined;
|
|
let debounceMs: number | undefined;
|
|
let cap: number | undefined;
|
|
let dropPolicy: QueueDropPolicy | undefined;
|
|
let rawDebounce: string | undefined;
|
|
let rawCap: string | undefined;
|
|
let rawDrop: string | undefined;
|
|
let hasOptions = false;
|
|
const takeToken = (): string | null => {
|
|
if (i >= len) return null;
|
|
const start = i;
|
|
while (i < len && !/\s/.test(raw[i])) i += 1;
|
|
if (start === i) return null;
|
|
const token = raw.slice(start, i);
|
|
while (i < len && /\s/.test(raw[i])) i += 1;
|
|
return token;
|
|
};
|
|
while (i < len) {
|
|
const token = takeToken();
|
|
if (!token) break;
|
|
const lowered = token.trim().toLowerCase();
|
|
if (lowered === "default" || lowered === "reset" || lowered === "clear") {
|
|
queueReset = true;
|
|
consumed = i;
|
|
break;
|
|
}
|
|
if (lowered.startsWith("debounce:") || lowered.startsWith("debounce=")) {
|
|
rawDebounce = token.split(/[:=]/)[1] ?? "";
|
|
debounceMs = parseQueueDebounce(rawDebounce);
|
|
hasOptions = true;
|
|
consumed = i;
|
|
continue;
|
|
}
|
|
if (lowered.startsWith("cap:") || lowered.startsWith("cap=")) {
|
|
rawCap = token.split(/[:=]/)[1] ?? "";
|
|
cap = parseQueueCap(rawCap);
|
|
hasOptions = true;
|
|
consumed = i;
|
|
continue;
|
|
}
|
|
if (lowered.startsWith("drop:") || lowered.startsWith("drop=")) {
|
|
rawDrop = token.split(/[:=]/)[1] ?? "";
|
|
dropPolicy = normalizeQueueDropPolicy(rawDrop);
|
|
hasOptions = true;
|
|
consumed = i;
|
|
continue;
|
|
}
|
|
const mode = normalizeQueueMode(token);
|
|
if (mode) {
|
|
queueMode = mode;
|
|
rawMode = token;
|
|
consumed = i;
|
|
continue;
|
|
}
|
|
// Stop at first unrecognized token.
|
|
break;
|
|
}
|
|
return {
|
|
consumed,
|
|
queueMode,
|
|
queueReset,
|
|
rawMode,
|
|
debounceMs,
|
|
cap,
|
|
dropPolicy,
|
|
rawDebounce,
|
|
rawCap,
|
|
rawDrop,
|
|
hasOptions,
|
|
};
|
|
}
|
|
export function extractQueueDirective(body?: string): {
|
|
cleaned: string;
|
|
queueMode?: QueueMode;
|
|
queueReset: boolean;
|
|
rawMode?: string;
|
|
hasDirective: boolean;
|
|
debounceMs?: number;
|
|
cap?: number;
|
|
dropPolicy?: QueueDropPolicy;
|
|
rawDebounce?: string;
|
|
rawCap?: string;
|
|
rawDrop?: string;
|
|
hasOptions: boolean;
|
|
} {
|
|
if (!body)
|
|
return {
|
|
cleaned: "",
|
|
hasDirective: false,
|
|
queueReset: false,
|
|
hasOptions: false,
|
|
};
|
|
const re = /(?:^|\s)\/queue(?=$|\s|:)/i;
|
|
const match = re.exec(body);
|
|
if (!match) {
|
|
return {
|
|
cleaned: body.trim(),
|
|
hasDirective: false,
|
|
queueReset: false,
|
|
hasOptions: false,
|
|
};
|
|
}
|
|
const start = match.index + match[0].indexOf("/queue");
|
|
const argsStart = start + "/queue".length;
|
|
const args = body.slice(argsStart);
|
|
const parsed = parseQueueDirectiveArgs(args);
|
|
const cleanedRaw = `${body.slice(0, start)} ${body.slice(
|
|
argsStart + parsed.consumed,
|
|
)}`;
|
|
const cleaned = cleanedRaw.replace(/\s+/g, " ").trim();
|
|
return {
|
|
cleaned,
|
|
queueMode: parsed.queueMode,
|
|
queueReset: parsed.queueReset,
|
|
rawMode: parsed.rawMode,
|
|
debounceMs: parsed.debounceMs,
|
|
cap: parsed.cap,
|
|
dropPolicy: parsed.dropPolicy,
|
|
rawDebounce: parsed.rawDebounce,
|
|
rawCap: parsed.rawCap,
|
|
rawDrop: parsed.rawDrop,
|
|
hasDirective: true,
|
|
hasOptions: parsed.hasOptions,
|
|
};
|
|
}
|
|
function elideText(text: string, limit = 140): string {
|
|
if (text.length <= limit) return text;
|
|
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`;
|
|
}
|
|
function buildQueueSummaryLine(run: FollowupRun): string {
|
|
const base = run.summaryLine?.trim() || run.prompt.trim();
|
|
const cleaned = base.replace(/\s+/g, " ").trim();
|
|
return elideText(cleaned, 160);
|
|
}
|
|
function getFollowupQueue(
|
|
key: string,
|
|
settings: QueueSettings,
|
|
): FollowupQueueState {
|
|
const existing = FOLLOWUP_QUEUES.get(key);
|
|
if (existing) {
|
|
existing.mode = settings.mode;
|
|
existing.debounceMs =
|
|
typeof settings.debounceMs === "number"
|
|
? Math.max(0, settings.debounceMs)
|
|
: existing.debounceMs;
|
|
existing.cap =
|
|
typeof settings.cap === "number" && settings.cap > 0
|
|
? Math.floor(settings.cap)
|
|
: existing.cap;
|
|
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
|
|
return existing;
|
|
}
|
|
const created: FollowupQueueState = {
|
|
items: [],
|
|
draining: false,
|
|
lastEnqueuedAt: 0,
|
|
mode: settings.mode,
|
|
debounceMs:
|
|
typeof settings.debounceMs === "number"
|
|
? Math.max(0, settings.debounceMs)
|
|
: DEFAULT_QUEUE_DEBOUNCE_MS,
|
|
cap:
|
|
typeof settings.cap === "number" && settings.cap > 0
|
|
? Math.floor(settings.cap)
|
|
: DEFAULT_QUEUE_CAP,
|
|
dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP,
|
|
droppedCount: 0,
|
|
summaryLines: [],
|
|
};
|
|
FOLLOWUP_QUEUES.set(key, created);
|
|
return created;
|
|
}
|
|
/**
|
|
* Check if a prompt is already queued using exact match.
|
|
*/
|
|
function isPromptAlreadyQueued(
|
|
prompt: string,
|
|
queue: FollowupQueueState,
|
|
): boolean {
|
|
return queue.items.some((item) => item.prompt === prompt);
|
|
}
|
|
|
|
export function enqueueFollowupRun(
|
|
key: string,
|
|
run: FollowupRun,
|
|
settings: QueueSettings,
|
|
): boolean {
|
|
const queue = getFollowupQueue(key, settings);
|
|
queue.lastEnqueuedAt = Date.now();
|
|
queue.lastRun = run.run;
|
|
|
|
// Deduplicate: skip if the same prompt is already queued.
|
|
if (isPromptAlreadyQueued(run.prompt, queue)) {
|
|
return false;
|
|
}
|
|
|
|
const cap = queue.cap;
|
|
if (cap > 0 && queue.items.length >= cap) {
|
|
if (queue.dropPolicy === "new") {
|
|
return false;
|
|
}
|
|
const dropCount = queue.items.length - cap + 1;
|
|
const dropped = queue.items.splice(0, dropCount);
|
|
if (queue.dropPolicy === "summarize") {
|
|
for (const item of dropped) {
|
|
queue.droppedCount += 1;
|
|
queue.summaryLines.push(buildQueueSummaryLine(item));
|
|
}
|
|
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
|
|
}
|
|
}
|
|
queue.items.push(run);
|
|
return true;
|
|
}
|
|
async function waitForQueueDebounce(queue: FollowupQueueState): Promise<void> {
|
|
const debounceMs = Math.max(0, queue.debounceMs);
|
|
if (debounceMs <= 0) return;
|
|
while (true) {
|
|
const since = Date.now() - queue.lastEnqueuedAt;
|
|
if (since >= debounceMs) return;
|
|
await new Promise((resolve) => setTimeout(resolve, debounceMs - since));
|
|
}
|
|
}
|
|
function buildSummaryPrompt(queue: FollowupQueueState): string | undefined {
|
|
if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) {
|
|
return undefined;
|
|
}
|
|
const lines = [
|
|
`[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`,
|
|
];
|
|
if (queue.summaryLines.length > 0) {
|
|
lines.push("Summary:");
|
|
for (const line of queue.summaryLines) {
|
|
lines.push(`- ${line}`);
|
|
}
|
|
}
|
|
queue.droppedCount = 0;
|
|
queue.summaryLines = [];
|
|
return lines.join("\n");
|
|
}
|
|
function buildCollectPrompt(items: FollowupRun[], summary?: string): string {
|
|
const blocks: string[] = ["[Queued messages while agent was busy]"];
|
|
if (summary) {
|
|
blocks.push(summary);
|
|
}
|
|
items.forEach((item, idx) => {
|
|
blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim());
|
|
});
|
|
return blocks.join("\n\n");
|
|
}
|
|
|
|
/**
|
|
* Checks if queued items have different routable originating channels.
|
|
*
|
|
* Returns true if messages come from different providers (e.g., Slack + Telegram),
|
|
* meaning they cannot be safely collected into one prompt without losing routing.
|
|
* Also returns true for a mix of routable and non-routable channels.
|
|
*/
|
|
function hasCrossProviderItems(items: FollowupRun[]): boolean {
|
|
const keys = new Set<string>();
|
|
let hasUnkeyed = false;
|
|
|
|
for (const item of items) {
|
|
const channel = item.originatingChannel;
|
|
const to = item.originatingTo;
|
|
const accountId = item.originatingAccountId;
|
|
const threadId = item.originatingThreadId;
|
|
if (!channel && !to && !accountId && typeof threadId !== "number") {
|
|
hasUnkeyed = true;
|
|
continue;
|
|
}
|
|
if (!isRoutableChannel(channel) || !to) {
|
|
return true;
|
|
}
|
|
keys.add(
|
|
[
|
|
channel,
|
|
to,
|
|
accountId || "",
|
|
typeof threadId === "number" ? String(threadId) : "",
|
|
].join("|"),
|
|
);
|
|
}
|
|
|
|
if (keys.size === 0) return false;
|
|
if (hasUnkeyed) return true;
|
|
return keys.size > 1;
|
|
}
|
|
export function scheduleFollowupDrain(
|
|
key: string,
|
|
runFollowup: (run: FollowupRun) => Promise<void>,
|
|
): void {
|
|
const queue = FOLLOWUP_QUEUES.get(key);
|
|
if (!queue || queue.draining) return;
|
|
queue.draining = true;
|
|
void (async () => {
|
|
try {
|
|
let forceIndividualCollect = false;
|
|
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
|
await waitForQueueDebounce(queue);
|
|
if (queue.mode === "collect") {
|
|
// Once the batch is mixed, never collect again within this drain.
|
|
// Prevents “collect after shift” collapsing different targets.
|
|
//
|
|
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
|
|
if (forceIndividualCollect) {
|
|
const next = queue.items.shift();
|
|
if (!next) break;
|
|
await runFollowup(next);
|
|
continue;
|
|
}
|
|
|
|
// Check if messages span multiple providers.
|
|
// If so, process individually to preserve per-message routing.
|
|
const isCrossProvider = hasCrossProviderItems(queue.items);
|
|
|
|
if (isCrossProvider) {
|
|
forceIndividualCollect = true;
|
|
// Process one at a time to preserve per-message routing info.
|
|
const next = queue.items.shift();
|
|
if (!next) break;
|
|
await runFollowup(next);
|
|
continue;
|
|
}
|
|
|
|
// Same-provider messages can be safely collected.
|
|
const items = queue.items.splice(0, queue.items.length);
|
|
const summary = buildSummaryPrompt(queue);
|
|
const run = items.at(-1)?.run ?? queue.lastRun;
|
|
if (!run) break;
|
|
|
|
// Preserve originating channel from items when collecting same-provider.
|
|
const originatingChannel = items.find(
|
|
(i) => i.originatingChannel,
|
|
)?.originatingChannel;
|
|
const originatingTo = items.find(
|
|
(i) => i.originatingTo,
|
|
)?.originatingTo;
|
|
const originatingAccountId = items.find(
|
|
(i) => i.originatingAccountId,
|
|
)?.originatingAccountId;
|
|
const originatingThreadId = items.find(
|
|
(i) => typeof i.originatingThreadId === "number",
|
|
)?.originatingThreadId;
|
|
|
|
const prompt = buildCollectPrompt(items, summary);
|
|
await runFollowup({
|
|
prompt,
|
|
run,
|
|
enqueuedAt: Date.now(),
|
|
originatingChannel,
|
|
originatingTo,
|
|
originatingAccountId,
|
|
originatingThreadId,
|
|
});
|
|
continue;
|
|
}
|
|
const summaryPrompt = buildSummaryPrompt(queue);
|
|
if (summaryPrompt) {
|
|
const run = queue.lastRun;
|
|
if (!run) break;
|
|
await runFollowup({
|
|
prompt: summaryPrompt,
|
|
run,
|
|
enqueuedAt: Date.now(),
|
|
});
|
|
continue;
|
|
}
|
|
const next = queue.items.shift();
|
|
if (!next) break;
|
|
await runFollowup(next);
|
|
}
|
|
} catch (err) {
|
|
defaultRuntime.error?.(
|
|
`followup queue drain failed for ${key}: ${String(err)}`,
|
|
);
|
|
} finally {
|
|
queue.draining = false;
|
|
if (queue.items.length === 0 && queue.droppedCount === 0) {
|
|
FOLLOWUP_QUEUES.delete(key);
|
|
} else {
|
|
scheduleFollowupDrain(key, runFollowup);
|
|
}
|
|
}
|
|
})();
|
|
}
|
|
function defaultQueueModeForProvider(provider?: string): QueueMode {
|
|
const normalized = provider?.trim().toLowerCase();
|
|
if (normalized === "discord") return "collect";
|
|
if (normalized === "webchat") return "collect";
|
|
if (normalized === "whatsapp") return "collect";
|
|
if (normalized === "telegram") return "collect";
|
|
if (normalized === "imessage") return "collect";
|
|
if (normalized === "signal") return "collect";
|
|
return "collect";
|
|
}
|
|
export function resolveQueueSettings(params: {
|
|
cfg: ClawdbotConfig;
|
|
provider?: string;
|
|
sessionEntry?: SessionEntry;
|
|
inlineMode?: QueueMode;
|
|
inlineOptions?: Partial<QueueSettings>;
|
|
}): QueueSettings {
|
|
const providerKey = params.provider?.trim().toLowerCase();
|
|
const queueCfg = params.cfg.messages?.queue;
|
|
const providerModeRaw =
|
|
providerKey && queueCfg?.byProvider
|
|
? (queueCfg.byProvider as Record<string, string | undefined>)[providerKey]
|
|
: undefined;
|
|
const resolvedMode =
|
|
params.inlineMode ??
|
|
normalizeQueueMode(params.sessionEntry?.queueMode) ??
|
|
normalizeQueueMode(providerModeRaw) ??
|
|
normalizeQueueMode(queueCfg?.mode) ??
|
|
defaultQueueModeForProvider(providerKey);
|
|
const debounceRaw =
|
|
params.inlineOptions?.debounceMs ??
|
|
params.sessionEntry?.queueDebounceMs ??
|
|
queueCfg?.debounceMs ??
|
|
DEFAULT_QUEUE_DEBOUNCE_MS;
|
|
const capRaw =
|
|
params.inlineOptions?.cap ??
|
|
params.sessionEntry?.queueCap ??
|
|
queueCfg?.cap ??
|
|
DEFAULT_QUEUE_CAP;
|
|
const dropRaw =
|
|
params.inlineOptions?.dropPolicy ??
|
|
params.sessionEntry?.queueDrop ??
|
|
normalizeQueueDropPolicy(queueCfg?.drop) ??
|
|
DEFAULT_QUEUE_DROP;
|
|
return {
|
|
mode: resolvedMode,
|
|
debounceMs:
|
|
typeof debounceRaw === "number" ? Math.max(0, debounceRaw) : undefined,
|
|
cap:
|
|
typeof capRaw === "number" ? Math.max(1, Math.floor(capRaw)) : undefined,
|
|
dropPolicy: dropRaw,
|
|
};
|
|
}
|
|
|
|
export function getFollowupQueueDepth(key: string): number {
|
|
const cleaned = key.trim();
|
|
if (!cleaned) return 0;
|
|
const queue = FOLLOWUP_QUEUES.get(cleaned);
|
|
if (!queue) return 0;
|
|
return queue.items.length;
|
|
}
|