This commit is contained in:
Tan Fuhua 2026-01-30 16:28:26 +08:00 committed by GitHub
commit d8bfae3ee5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 146 additions and 3 deletions

View File

@ -131,6 +131,77 @@ describe("monitorIMessageProvider", () => {
);
});
it("skips self-chat messages that are not marked from me", async () => {
const run = monitorIMessageProvider();
await waitForSubscribe();
notificationHandler?.({
method: "message",
params: {
message: {
id: 5,
chat_id: 42,
sender: "+15550004444",
is_from_me: false,
text: "echo",
is_group: false,
participants: ["+15550004444"],
},
},
});
await flush();
closeResolve?.();
await run;
expect(replyMock).not.toHaveBeenCalled();
expect(sendMock).not.toHaveBeenCalled();
});
it("drops inbound echoes that match recently sent outbound ids", async () => {
sendMock.mockResolvedValueOnce({ messageId: "MSG-1" });
replyMock.mockResolvedValueOnce({ text: "ok" });
const run = monitorIMessageProvider();
await waitForSubscribe();
notificationHandler?.({
method: "message",
params: {
message: {
id: 10,
chat_id: 7,
sender: "+15550004444",
is_from_me: false,
text: "hey",
is_group: false,
},
},
});
await flush();
notificationHandler?.({
method: "message",
params: {
message: {
id: "MSG-1",
chat_id: 7,
sender: "+15550004444",
is_from_me: false,
text: "echoed",
is_group: false,
},
},
});
await flush();
closeResolve?.();
await run;
expect(replyMock).toHaveBeenCalledTimes(1);
expect(sendMock).toHaveBeenCalledTimes(1);
});
it("does not trigger unhandledRejection when aborting during shutdown", async () => {
requestMock.mockImplementation((method: string) => {
if (method === "watch.subscribe") return Promise.resolve({ subscription: 1 });

View File

@ -15,8 +15,9 @@ export async function deliverReplies(params: {
runtime: RuntimeEnv;
maxBytes: number;
textLimit: number;
onSent?: (messageId: string) => void;
}) {
const { replies, target, client, runtime, maxBytes, textLimit, accountId } = params;
const { replies, target, client, runtime, maxBytes, textLimit, accountId, onSent } = params;
const cfg = loadConfig();
const tableMode = resolveMarkdownTableMode({
cfg,
@ -31,23 +32,29 @@ export async function deliverReplies(params: {
if (!text && mediaList.length === 0) continue;
if (mediaList.length === 0) {
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
await sendMessageIMessage(target, chunk, {
const result = await sendMessageIMessage(target, chunk, {
maxBytes,
client,
accountId,
});
if (result.messageId && result.messageId !== "ok" && result.messageId !== "unknown") {
onSent?.(result.messageId);
}
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? text : "";
first = false;
await sendMessageIMessage(target, caption, {
const result = await sendMessageIMessage(target, caption, {
mediaUrl: url,
maxBytes,
client,
accountId,
});
if (result.messageId && result.messageId !== "ok" && result.messageId !== "unknown") {
onSent?.(result.messageId);
}
}
}
runtime.log?.(`imessage: delivered reply to ${target}`);

View File

@ -97,6 +97,26 @@ function normalizeReplyField(value: unknown): string | undefined {
return undefined;
}
function normalizeMessageId(value: unknown): string | undefined {
if (typeof value === "string") {
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
if (typeof value === "number") return String(value);
return undefined;
}
function isSelfChatMessage(message: IMessagePayload, senderNormalized: string): boolean {
if (Boolean(message.is_group)) return false;
const participants =
message.participants?.map((entry) => (entry ? normalizeIMessageHandle(entry) : "")).filter(Boolean) ??
[];
if (participants.length === 0) return false;
const unique = new Set(participants);
if (unique.size !== 1) return false;
return unique.has(senderNormalized);
}
function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null {
const body = normalizeReplyField(message.reply_to_text);
if (!body) return null;
@ -134,6 +154,9 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024;
const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg";
const dbPath = opts.dbPath ?? imessageCfg.dbPath;
const OUTBOUND_CACHE_TTL_MS = 5 * 60 * 1000;
const OUTBOUND_CACHE_MAX = 200;
const recentOutboundIds = new Map<string, number>();
// Resolve remoteHost: explicit config, or auto-detect from SSH wrapper script
let remoteHost = imessageCfg.remoteHost;
@ -185,12 +208,53 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
});
function pruneRecentOutboundIds(now = Date.now()) {
for (const [key, timestamp] of recentOutboundIds) {
if (now - timestamp > OUTBOUND_CACHE_TTL_MS) {
recentOutboundIds.delete(key);
}
}
if (recentOutboundIds.size > OUTBOUND_CACHE_MAX) {
const entries = Array.from(recentOutboundIds.entries()).sort((a, b) => a[1] - b[1]);
for (let i = 0; i < entries.length - OUTBOUND_CACHE_MAX; i += 1) {
recentOutboundIds.delete(entries[i]?.[0] ?? "");
}
}
}
function trackOutboundId(messageId: string | undefined) {
if (!messageId) return;
const trimmed = messageId.trim();
if (!trimmed) return;
recentOutboundIds.set(trimmed, Date.now());
pruneRecentOutboundIds();
}
async function handleMessageNow(message: IMessagePayload) {
const senderRaw = message.sender ?? "";
const sender = senderRaw.trim();
if (!sender) return;
const senderNormalized = normalizeIMessageHandle(sender);
if (message.is_from_me) return;
const messageId = normalizeMessageId(message.id);
if (messageId && recentOutboundIds.has(messageId)) {
logInboundDrop({
log: logVerbose,
channel: "imessage",
reason: "outbound echo",
target: sender,
});
return;
}
if (isSelfChatMessage(message, senderNormalized)) {
logInboundDrop({
log: logVerbose,
channel: "imessage",
reason: "self chat",
target: sender,
});
return;
}
const chatId = message.chat_id ?? undefined;
const chatGuid = message.chat_guid ?? undefined;
@ -543,6 +607,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
runtime,
maxBytes: mediaMaxBytes,
textLimit,
onSent: trackOutboundId,
});
},
onError: (err, info) => {