diff --git a/extensions/tlon/src/urbit/sse-client.ts b/extensions/tlon/src/urbit/sse-client.ts index 19878e679..007351c2c 100644 --- a/extensions/tlon/src/urbit/sse-client.ts +++ b/extensions/tlon/src/urbit/sse-client.ts @@ -42,6 +42,11 @@ export class UrbitSSEClient { maxReconnectDelay: number; isConnected = false; logger: UrbitSseLogger; + + // Event ack tracking - must ack every ~50 events to keep channel healthy + private lastHeardEventId = -1; + private lastAcknowledgedEventId = -1; + private readonly ackThreshold = 20; constructor(url: string, cookie: string, options: UrbitSseOptions = {}) { this.url = url; @@ -214,15 +219,31 @@ export class UrbitSSEClient { processEvent(eventData: string) { const lines = eventData.split("\n"); let data: string | null = null; + let eventId: number | null = null; for (const line of lines) { if (line.startsWith("data: ")) { data = line.substring(6); } + if (line.startsWith("id: ")) { + eventId = parseInt(line.substring(4), 10); + } } if (!data) return; + // Track event ID and send ack if needed + if (eventId !== null && !isNaN(eventId)) { + if (eventId > this.lastHeardEventId) { + this.lastHeardEventId = eventId; + if (eventId - this.lastAcknowledgedEventId > this.ackThreshold) { + this.ack(eventId).catch((err) => { + this.logger.error?.(`Failed to ack event ${eventId}: ${String(err)}`); + }); + } + } + } + try { const parsed = JSON.parse(data) as { id?: number; json?: unknown; response?: string }; @@ -248,6 +269,30 @@ export class UrbitSSEClient { this.logger.error?.(`Error parsing SSE event: ${String(error)}`); } } + + private async ack(eventId: number): Promise { + this.lastAcknowledgedEventId = eventId; + + const ackData = { + action: "ack", + "event-id": eventId, + }; + + const response = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([ackData]), + }); + + if (!response.ok && response.status !== 204) { + throw new Error(`Ack failed: ${response.status}`); + } + + this.logger.log?.(`[SSE] Acked event ${eventId}`); + } async poke(params: { app: string; mark: string; json: unknown }) { const pokeId = Date.now();