Tlon plugin: firehose subscriptions + @all/nickname mentions

- Switch from per-channel subscriptions to firehose (/v2 channels, /v3 chat)
- Parse sect field for @all mentions
- Add @all as trigger for bot responses (like direct mention)
- Fetch bot nickname from contacts on startup (/contacts/v1/self.json)
- Subscribe to contacts updates (/v1/news) for live nickname changes
- Improve rich text parsing (inline-code, bold, italic, strike, blockquote)
This commit is contained in:
Hunter Miller 2026-01-29 19:04:43 -06:00
parent 9f67cceeb3
commit b44d129c56
2 changed files with 266 additions and 192 deletions

View File

@ -93,6 +93,21 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
const processedTracker = createProcessedMessageTracker(2000);
let groupChannels: string[] = [];
let botNickname: string | null = null;
// Fetch bot's nickname from contacts
try {
const selfProfile = await api.scry("/contacts/v1/self.json");
if (selfProfile && typeof selfProfile === "object") {
const profile = selfProfile as { nickname?: { value?: string } };
botNickname = profile.nickname?.value || null;
if (botNickname) {
runtime.log?.(`[tlon] Bot nickname: ${botNickname}`);
}
}
} catch (error: any) {
runtime.log?.(`[tlon] Could not fetch nickname: ${error?.message ?? String(error)}`);
}
if (account.autoDiscoverChannels !== false) {
try {
@ -118,117 +133,20 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
runtime.log?.("[tlon] No group channels to monitor (DMs only)");
}
const handleIncomingDM = async (update: any) => {
try {
const memo = update?.response?.add?.memo;
if (!memo) return;
const messageId = update.id as string | undefined;
if (!processedTracker.mark(messageId)) return;
const senderShip = normalizeShip(memo.author ?? "");
if (!senderShip || senderShip === botShipName) return;
const messageText = extractMessageText(memo.content);
if (!messageText) return;
if (!isDmAllowed(senderShip, account.dmAllowlist)) {
runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
return;
}
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
isGroup: false,
timestamp: memo.sent || Date.now(),
});
} catch (error: any) {
runtime.error?.(`[tlon] Error handling DM: ${error?.message ?? String(error)}`);
}
};
const handleIncomingGroupMessage = (channelNest: string) => async (update: any) => {
try {
const parsed = parseChannelNest(channelNest);
if (!parsed) return;
const essay = update?.response?.post?.["r-post"]?.set?.essay;
const memo = update?.response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo;
if (!essay && !memo) return;
const content = memo || essay;
const isThreadReply = Boolean(memo);
const messageId = isThreadReply
? update?.response?.post?.["r-post"]?.reply?.id
: update?.response?.post?.id;
if (!processedTracker.mark(messageId)) return;
const senderShip = normalizeShip(content.author ?? "");
if (!senderShip || senderShip === botShipName) return;
const messageText = extractMessageText(content.content);
if (!messageText) return;
cacheMessage(channelNest, {
author: senderShip,
content: messageText,
timestamp: content.sent || Date.now(),
id: messageId,
});
const mentioned = isBotMentioned(messageText, botShipName);
if (!mentioned) return;
const { mode, allowedShips } = resolveChannelAuthorization(cfg, channelNest);
if (mode === "restricted") {
if (allowedShips.length === 0) {
runtime.log?.(`[tlon] Access denied: ${senderShip} in ${channelNest} (no allowlist)`);
return;
}
const normalizedAllowed = allowedShips.map(normalizeShip);
if (!normalizedAllowed.includes(senderShip)) {
runtime.log?.(
`[tlon] Access denied: ${senderShip} in ${channelNest} (allowed: ${allowedShips.join(", ")})`,
);
return;
}
}
const seal = isThreadReply
? update?.response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal
: update?.response?.post?.["r-post"]?.set?.seal;
const parentId = seal?.["parent-id"] || seal?.parent || null;
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
isGroup: true,
groupChannel: channelNest,
groupName: `${parsed.hostShip}/${parsed.channelName}`,
timestamp: content.sent || Date.now(),
parentId,
});
} catch (error: any) {
runtime.error?.(`[tlon] Error handling group message: ${error?.message ?? String(error)}`);
}
};
const processMessage = async (params: {
messageId: string;
senderShip: string;
messageText: string;
isGroup: boolean;
groupChannel?: string;
groupName?: string;
channelNest?: string;
hostShip?: string;
channelName?: string;
timestamp: number;
parentId?: string | null;
isThreadReply?: boolean;
}) => {
const { messageId, senderShip, isGroup, groupChannel, groupName, timestamp, parentId } = params;
const { messageId, senderShip, isGroup, channelNest, hostShip, channelName, timestamp, parentId, isThreadReply } = params;
const groupChannel = channelNest; // For compatibility
let messageText = params.messageText;
if (isGroup && groupChannel && isSummarizationRequest(messageText)) {
@ -295,7 +213,7 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
},
});
const fromLabel = isGroup ? `${senderShip} in ${groupName}` : senderShip;
const fromLabel = isGroup ? `${senderShip} in ${channelNest}` : senderShip;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Tlon",
from: fromLabel,
@ -370,110 +288,223 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
});
};
const subscribedChannels = new Set<string>();
const subscribedDMs = new Set<string>();
async function subscribeToChannel(channelNest: string) {
if (subscribedChannels.has(channelNest)) return;
const parsed = parseChannelNest(channelNest);
if (!parsed) {
runtime.error?.(`[tlon] Invalid channel format: ${channelNest}`);
return;
}
// Track which channels we're interested in for filtering firehose events
const watchedChannels = new Set<string>(groupChannels);
const watchedDMs = new Set<string>();
// Firehose handler for all channel messages (/v2)
const handleChannelsFirehose = async (event: any) => {
try {
await api!.subscribe({
app: "channels",
path: `/${channelNest}`,
event: handleIncomingGroupMessage(channelNest),
err: (error) => {
runtime.error?.(`[tlon] Group subscription error for ${channelNest}: ${String(error)}`);
},
quit: () => {
runtime.log?.(`[tlon] Group subscription ended for ${channelNest}`);
subscribedChannels.delete(channelNest);
},
const nest = event?.nest;
if (!nest) return;
// Only process channels we're watching
if (!watchedChannels.has(nest)) return;
const response = event?.response;
if (!response) return;
// Handle post responses (new posts and replies)
const essay = response?.post?.["r-post"]?.set?.essay;
const memo = response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo;
if (!essay && !memo) return;
const content = memo || essay;
const isThreadReply = Boolean(memo);
const messageId = isThreadReply
? response?.post?.["r-post"]?.reply?.id
: response?.post?.id;
if (!processedTracker.mark(messageId)) return;
const senderShip = normalizeShip(content.author ?? "");
if (!senderShip || senderShip === botShipName) return;
const messageText = extractMessageText(content.content);
if (!messageText) return;
cacheMessage(nest, {
author: senderShip,
content: messageText,
timestamp: content.sent || Date.now(),
id: messageId,
});
subscribedChannels.add(channelNest);
runtime.log?.(`[tlon] Subscribed to group channel: ${channelNest}`);
} catch (error: any) {
runtime.error?.(`[tlon] Failed to subscribe to ${channelNest}: ${error?.message ?? String(error)}`);
}
}
async function subscribeToDM(dmShip: string) {
if (subscribedDMs.has(dmShip)) return;
try {
await api!.subscribe({
app: "chat",
path: `/dm/${dmShip}`,
event: handleIncomingDM,
err: (error) => {
runtime.error?.(`[tlon] DM subscription error for ${dmShip}: ${String(error)}`);
},
quit: () => {
runtime.log?.(`[tlon] DM subscription ended for ${dmShip}`);
subscribedDMs.delete(dmShip);
},
});
subscribedDMs.add(dmShip);
runtime.log?.(`[tlon] Subscribed to DM with ${dmShip}`);
} catch (error: any) {
runtime.error?.(`[tlon] Failed to subscribe to DM with ${dmShip}: ${error?.message ?? String(error)}`);
}
}
const mentioned = isBotMentioned(messageText, botShipName, botNickname ?? undefined);
if (!mentioned) return;
async function refreshChannelSubscriptions() {
try {
const dmShips = await api!.scry("/chat/dm.json");
if (Array.isArray(dmShips)) {
for (const dmShip of dmShips) {
await subscribeToDM(dmShip);
const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest);
if (mode === "restricted") {
if (allowedShips.length === 0) {
runtime.log?.(`[tlon] Access denied: ${senderShip} in ${nest} (no allowlist)`);
return;
}
const normalizedAllowed = allowedShips.map(normalizeShip);
if (!normalizedAllowed.includes(senderShip)) {
runtime.log?.(`[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`);
return;
}
}
if (account.autoDiscoverChannels !== false) {
const discoveredChannels = await fetchAllChannels(api!, runtime);
for (const channelNest of discoveredChannels) {
await subscribeToChannel(channelNest);
}
}
const seal = isThreadReply
? response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal
: response?.post?.["r-post"]?.set?.seal;
const parentId = seal?.["parent-id"] || seal?.parent || null;
const parsed = parseChannelNest(nest);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
isGroup: true,
channelNest: nest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: content.sent || Date.now(),
parentId,
isThreadReply,
});
} catch (error: any) {
runtime.error?.(`[tlon] Channel refresh failed: ${error?.message ?? String(error)}`);
runtime.error?.(`[tlon] Error handling channel firehose event: ${error?.message ?? String(error)}`);
}
}
};
// Firehose handler for all DM messages (/v3)
const handleChatFirehose = async (event: any) => {
try {
// Skip non-message events (arrays are DM invite lists, etc.)
if (Array.isArray(event)) return;
if (!("whom" in event) || !("response" in event)) return;
const whom = event.whom; // DM partner ship or club ID
const messageId = event.id;
const response = event.response;
// Handle add events (new messages)
const essay = response?.add?.essay;
if (!essay) return;
if (!processedTracker.mark(messageId)) return;
const senderShip = normalizeShip(essay.author ?? "");
if (!senderShip || senderShip === botShipName) return;
const messageText = extractMessageText(essay.content);
if (!messageText) return;
// For DMs, check allowlist
if (!isDmAllowed(senderShip, account.dmAllowlist)) {
runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
return;
}
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
isGroup: false,
timestamp: essay.sent || Date.now(),
});
} catch (error: any) {
runtime.error?.(`[tlon] Error handling chat firehose event: ${error?.message ?? String(error)}`);
}
};
try {
runtime.log?.("[tlon] Subscribing to updates...");
runtime.log?.("[tlon] Subscribing to firehose updates...");
let dmShips: string[] = [];
try {
const dmList = await api!.scry("/chat/dm.json");
if (Array.isArray(dmList)) {
dmShips = dmList;
runtime.log?.(`[tlon] Found ${dmShips.length} DM conversation(s)`);
// Subscribe to channels firehose (/v2)
await api!.subscribe({
app: "channels",
path: "/v2",
event: handleChannelsFirehose,
err: (error) => {
runtime.error?.(`[tlon] Channels firehose error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Channels firehose subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to channels firehose (/v2)");
// Subscribe to chat/DM firehose (/v3)
await api!.subscribe({
app: "chat",
path: "/v3",
event: handleChatFirehose,
err: (error) => {
runtime.error?.(`[tlon] Chat firehose error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Chat firehose subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to chat firehose (/v3)");
// Subscribe to contacts updates to track nickname changes
await api!.subscribe({
app: "contacts",
path: "/v1/news",
event: (event: any) => {
try {
// Look for self profile updates
if (event?.self) {
const selfUpdate = event.self;
if (selfUpdate?.contact?.nickname?.value !== undefined) {
const newNickname = selfUpdate.contact.nickname.value || null;
if (newNickname !== botNickname) {
botNickname = newNickname;
runtime.log?.(`[tlon] Nickname updated: ${botNickname}`);
}
}
}
} catch (error: any) {
runtime.error?.(`[tlon] Error handling contacts event: ${error?.message ?? String(error)}`);
}
},
err: (error) => {
runtime.error?.(`[tlon] Contacts subscription error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Contacts subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to contacts updates (/v1/news)");
// Discover channels to watch
if (account.autoDiscoverChannels !== false) {
const discoveredChannels = await fetchAllChannels(api!, runtime);
for (const channelNest of discoveredChannels) {
watchedChannels.add(channelNest);
}
} catch (error: any) {
runtime.error?.(`[tlon] Failed to fetch DM list: ${error?.message ?? String(error)}`);
runtime.log?.(`[tlon] Watching ${watchedChannels.size} channel(s)`);
}
for (const dmShip of dmShips) {
await subscribeToDM(dmShip);
}
for (const channelNest of groupChannels) {
await subscribeToChannel(channelNest);
// Log watched channels
for (const channelNest of watchedChannels) {
runtime.log?.(`[tlon] Watching channel: ${channelNest}`);
}
runtime.log?.("[tlon] All subscriptions registered, connecting to SSE stream...");
await api!.connect();
runtime.log?.("[tlon] Connected! All subscriptions active");
runtime.log?.("[tlon] Connected! Firehose subscriptions active");
const pollInterval = setInterval(() => {
// Periodically refresh channel discovery
const pollInterval = setInterval(async () => {
if (!opts.abortSignal?.aborted) {
refreshChannelSubscriptions().catch((error) => {
try {
if (account.autoDiscoverChannels !== false) {
const discoveredChannels = await fetchAllChannels(api!, runtime);
for (const channelNest of discoveredChannels) {
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
runtime.log?.(`[tlon] Now watching new channel: ${channelNest}`);
}
}
}
} catch (error: any) {
runtime.error?.(`[tlon] Channel refresh error: ${error?.message ?? String(error)}`);
});
}
}
}, 2 * 60 * 1000);

View File

@ -22,12 +22,30 @@ export function formatModelName(modelString?: string | null): string {
.join(" ");
}
export function isBotMentioned(messageText: string, botShipName: string): boolean {
export function isBotMentioned(
messageText: string,
botShipName: string,
nickname?: string
): boolean {
if (!messageText || !botShipName) return false;
// Check for @all mention
if (/@all\b/i.test(messageText)) return true;
// Check for ship mention
const normalizedBotShip = normalizeShip(botShipName);
const escapedShip = normalizedBotShip.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const mentionPattern = new RegExp(`(^|\\s)${escapedShip}(?=\\s|$)`, "i");
return mentionPattern.test(messageText);
if (mentionPattern.test(messageText)) return true;
// Check for nickname mention (case-insensitive, word boundary)
if (nickname) {
const escapedNickname = nickname.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const nicknamePattern = new RegExp(`(^|\\s)${escapedNickname}(?=\\s|$|[,!?.])`, "i");
if (nicknamePattern.test(messageText)) return true;
}
return false;
}
export function isDmAllowed(senderShip: string, allowlist: string[] | undefined): boolean {
@ -38,6 +56,24 @@ export function isDmAllowed(senderShip: string, allowlist: string[] | undefined)
.some((ship) => ship === normalizedSender);
}
// Helper to recursively extract text from inline content
function extractInlineText(items: any[]): string {
return items.map((item: any) => {
if (typeof item === "string") return item;
if (item && typeof item === "object") {
if (item.ship) return item.ship;
if ("sect" in item) return `@${item.sect || "all"}`;
if (item["inline-code"]) return `\`${item["inline-code"]}\``;
if (item.code) return `\`${item.code}\``;
if (item.link && item.link.href) return item.link.content || item.link.href;
if (item.bold && Array.isArray(item.bold)) return `**${extractInlineText(item.bold)}**`;
if (item.italics && Array.isArray(item.italics)) return `*${extractInlineText(item.italics)}*`;
if (item.strike && Array.isArray(item.strike)) return `~~${extractInlineText(item.strike)}~~`;
}
return "";
}).join("");
}
export function extractMessageText(content: unknown): string {
if (!content || !Array.isArray(content)) return "";
@ -50,19 +86,26 @@ export function extractMessageText(content: unknown): string {
if (typeof item === "string") return item;
if (item && typeof item === "object") {
if (item.ship) return item.ship;
// Handle sect (role mentions like @all)
if ("sect" in item) return `@${item.sect || "all"}`;
if (item.break !== undefined) return "\n";
if (item.link && item.link.href) return item.link.href;
// Handle inline code
// Handle inline code (Tlon uses "inline-code" key)
if (item["inline-code"]) return `\`${item["inline-code"]}\``;
if (item.code) return `\`${item.code}\``;
// Handle bold/italic/strike
// Handle bold/italic/strike - recursively extract text
if (item.bold && Array.isArray(item.bold)) {
return item.bold.map((b: any) => typeof b === "string" ? b : "").join("");
return `**${extractInlineText(item.bold)}**`;
}
if (item.italics && Array.isArray(item.italics)) {
return item.italics.map((i: any) => typeof i === "string" ? i : "").join("");
return `*${extractInlineText(item.italics)}*`;
}
if (item.strike && Array.isArray(item.strike)) {
return item.strike.map((s: any) => typeof s === "string" ? s : "").join("");
return `~~${extractInlineText(item.strike)}~~`;
}
// Handle blockquote inline
if (item.blockquote && Array.isArray(item.blockquote)) {
return `> ${extractInlineText(item.blockquote)}`;
}
}
return "";