fix(imessage): prevent self-chat and outbound echo loops (#2585)
This commit is contained in:
parent
f4004054ab
commit
db854540aa
@ -131,6 +131,77 @@ describe("monitorIMessageProvider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("skips self-chat messages that are not marked from me", async () => {
|
||||
const run = monitorIMessageProvider();
|
||||
await waitForSubscribe();
|
||||
|
||||
notificationHandler?.({
|
||||
method: "message",
|
||||
params: {
|
||||
message: {
|
||||
id: 5,
|
||||
chat_id: 42,
|
||||
sender: "+15550004444",
|
||||
is_from_me: false,
|
||||
text: "echo",
|
||||
is_group: false,
|
||||
participants: ["+15550004444"],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await flush();
|
||||
closeResolve?.();
|
||||
await run;
|
||||
|
||||
expect(replyMock).not.toHaveBeenCalled();
|
||||
expect(sendMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("drops inbound echoes that match recently sent outbound ids", async () => {
|
||||
sendMock.mockResolvedValueOnce({ messageId: "MSG-1" });
|
||||
replyMock.mockResolvedValueOnce({ text: "ok" });
|
||||
const run = monitorIMessageProvider();
|
||||
await waitForSubscribe();
|
||||
|
||||
notificationHandler?.({
|
||||
method: "message",
|
||||
params: {
|
||||
message: {
|
||||
id: 10,
|
||||
chat_id: 7,
|
||||
sender: "+15550004444",
|
||||
is_from_me: false,
|
||||
text: "hey",
|
||||
is_group: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await flush();
|
||||
|
||||
notificationHandler?.({
|
||||
method: "message",
|
||||
params: {
|
||||
message: {
|
||||
id: "MSG-1",
|
||||
chat_id: 7,
|
||||
sender: "+15550004444",
|
||||
is_from_me: false,
|
||||
text: "echoed",
|
||||
is_group: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await flush();
|
||||
closeResolve?.();
|
||||
await run;
|
||||
|
||||
expect(replyMock).toHaveBeenCalledTimes(1);
|
||||
expect(sendMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not trigger unhandledRejection when aborting during shutdown", async () => {
|
||||
requestMock.mockImplementation((method: string) => {
|
||||
if (method === "watch.subscribe") return Promise.resolve({ subscription: 1 });
|
||||
|
||||
@ -15,8 +15,9 @@ export async function deliverReplies(params: {
|
||||
runtime: RuntimeEnv;
|
||||
maxBytes: number;
|
||||
textLimit: number;
|
||||
onSent?: (messageId: string) => void;
|
||||
}) {
|
||||
const { replies, target, client, runtime, maxBytes, textLimit, accountId } = params;
|
||||
const { replies, target, client, runtime, maxBytes, textLimit, accountId, onSent } = params;
|
||||
const cfg = loadConfig();
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
@ -31,23 +32,29 @@ export async function deliverReplies(params: {
|
||||
if (!text && mediaList.length === 0) continue;
|
||||
if (mediaList.length === 0) {
|
||||
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
|
||||
await sendMessageIMessage(target, chunk, {
|
||||
const result = await sendMessageIMessage(target, chunk, {
|
||||
maxBytes,
|
||||
client,
|
||||
accountId,
|
||||
});
|
||||
if (result.messageId && result.messageId !== "ok" && result.messageId !== "unknown") {
|
||||
onSent?.(result.messageId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let first = true;
|
||||
for (const url of mediaList) {
|
||||
const caption = first ? text : "";
|
||||
first = false;
|
||||
await sendMessageIMessage(target, caption, {
|
||||
const result = await sendMessageIMessage(target, caption, {
|
||||
mediaUrl: url,
|
||||
maxBytes,
|
||||
client,
|
||||
accountId,
|
||||
});
|
||||
if (result.messageId && result.messageId !== "ok" && result.messageId !== "unknown") {
|
||||
onSent?.(result.messageId);
|
||||
}
|
||||
}
|
||||
}
|
||||
runtime.log?.(`imessage: delivered reply to ${target}`);
|
||||
|
||||
@ -97,6 +97,26 @@ function normalizeReplyField(value: unknown): string | undefined {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function normalizeMessageId(value: unknown): string | undefined {
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
if (typeof value === "number") return String(value);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function isSelfChatMessage(message: IMessagePayload, senderNormalized: string): boolean {
|
||||
if (Boolean(message.is_group)) return false;
|
||||
const participants =
|
||||
message.participants?.map((entry) => (entry ? normalizeIMessageHandle(entry) : "")).filter(Boolean) ??
|
||||
[];
|
||||
if (participants.length === 0) return false;
|
||||
const unique = new Set(participants);
|
||||
if (unique.size !== 1) return false;
|
||||
return unique.has(senderNormalized);
|
||||
}
|
||||
|
||||
function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null {
|
||||
const body = normalizeReplyField(message.reply_to_text);
|
||||
if (!body) return null;
|
||||
@ -134,6 +154,9 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024;
|
||||
const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg";
|
||||
const dbPath = opts.dbPath ?? imessageCfg.dbPath;
|
||||
const OUTBOUND_CACHE_TTL_MS = 5 * 60 * 1000;
|
||||
const OUTBOUND_CACHE_MAX = 200;
|
||||
const recentOutboundIds = new Map<string, number>();
|
||||
|
||||
// Resolve remoteHost: explicit config, or auto-detect from SSH wrapper script
|
||||
let remoteHost = imessageCfg.remoteHost;
|
||||
@ -185,12 +208,53 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
},
|
||||
});
|
||||
|
||||
function pruneRecentOutboundIds(now = Date.now()) {
|
||||
for (const [key, timestamp] of recentOutboundIds) {
|
||||
if (now - timestamp > OUTBOUND_CACHE_TTL_MS) {
|
||||
recentOutboundIds.delete(key);
|
||||
}
|
||||
}
|
||||
if (recentOutboundIds.size > OUTBOUND_CACHE_MAX) {
|
||||
const entries = Array.from(recentOutboundIds.entries()).sort((a, b) => a[1] - b[1]);
|
||||
for (let i = 0; i < entries.length - OUTBOUND_CACHE_MAX; i += 1) {
|
||||
recentOutboundIds.delete(entries[i]?.[0] ?? "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function trackOutboundId(messageId: string | undefined) {
|
||||
if (!messageId) return;
|
||||
const trimmed = messageId.trim();
|
||||
if (!trimmed) return;
|
||||
recentOutboundIds.set(trimmed, Date.now());
|
||||
pruneRecentOutboundIds();
|
||||
}
|
||||
|
||||
async function handleMessageNow(message: IMessagePayload) {
|
||||
const senderRaw = message.sender ?? "";
|
||||
const sender = senderRaw.trim();
|
||||
if (!sender) return;
|
||||
const senderNormalized = normalizeIMessageHandle(sender);
|
||||
if (message.is_from_me) return;
|
||||
const messageId = normalizeMessageId(message.id);
|
||||
if (messageId && recentOutboundIds.has(messageId)) {
|
||||
logInboundDrop({
|
||||
log: logVerbose,
|
||||
channel: "imessage",
|
||||
reason: "outbound echo",
|
||||
target: sender,
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (isSelfChatMessage(message, senderNormalized)) {
|
||||
logInboundDrop({
|
||||
log: logVerbose,
|
||||
channel: "imessage",
|
||||
reason: "self chat",
|
||||
target: sender,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = message.chat_id ?? undefined;
|
||||
const chatGuid = message.chat_guid ?? undefined;
|
||||
@ -543,6 +607,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
runtime,
|
||||
maxBytes: mediaMaxBytes,
|
||||
textLimit,
|
||||
onSent: trackOutboundId,
|
||||
});
|
||||
},
|
||||
onError: (err, info) => {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user