Add SSE event ack support to prevent channel backup
- Track lastHeardEventId and lastAcknowledgedEventId - Send ack every 20 events (threshold recommended by Urbit) - Parse event ID from SSE 'id:' lines - This should fix dropped messages issue
This commit is contained in:
parent
a41d83c8f2
commit
40e8f37022
@ -42,6 +42,11 @@ export class UrbitSSEClient {
|
||||
maxReconnectDelay: number;
|
||||
isConnected = false;
|
||||
logger: UrbitSseLogger;
|
||||
|
||||
// Event ack tracking - must ack every ~50 events to keep channel healthy
|
||||
private lastHeardEventId = -1;
|
||||
private lastAcknowledgedEventId = -1;
|
||||
private readonly ackThreshold = 20;
|
||||
|
||||
constructor(url: string, cookie: string, options: UrbitSseOptions = {}) {
|
||||
this.url = url;
|
||||
@ -214,15 +219,31 @@ export class UrbitSSEClient {
|
||||
processEvent(eventData: string) {
|
||||
const lines = eventData.split("\n");
|
||||
let data: string | null = null;
|
||||
let eventId: number | null = null;
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith("data: ")) {
|
||||
data = line.substring(6);
|
||||
}
|
||||
if (line.startsWith("id: ")) {
|
||||
eventId = parseInt(line.substring(4), 10);
|
||||
}
|
||||
}
|
||||
|
||||
if (!data) return;
|
||||
|
||||
// Track event ID and send ack if needed
|
||||
if (eventId !== null && !isNaN(eventId)) {
|
||||
if (eventId > this.lastHeardEventId) {
|
||||
this.lastHeardEventId = eventId;
|
||||
if (eventId - this.lastAcknowledgedEventId > this.ackThreshold) {
|
||||
this.ack(eventId).catch((err) => {
|
||||
this.logger.error?.(`Failed to ack event ${eventId}: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(data) as { id?: number; json?: unknown; response?: string };
|
||||
|
||||
@ -248,6 +269,30 @@ export class UrbitSSEClient {
|
||||
this.logger.error?.(`Error parsing SSE event: ${String(error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async ack(eventId: number): Promise<void> {
|
||||
this.lastAcknowledgedEventId = eventId;
|
||||
|
||||
const ackData = {
|
||||
action: "ack",
|
||||
"event-id": eventId,
|
||||
};
|
||||
|
||||
const response = await fetch(this.channelUrl, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Cookie: this.cookie,
|
||||
},
|
||||
body: JSON.stringify([ackData]),
|
||||
});
|
||||
|
||||
if (!response.ok && response.status !== 204) {
|
||||
throw new Error(`Ack failed: ${response.status}`);
|
||||
}
|
||||
|
||||
this.logger.log?.(`[SSE] Acked event ${eventId}`);
|
||||
}
|
||||
|
||||
async poke(params: { app: string; mark: string; json: unknown }) {
|
||||
const pokeId = Date.now();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user