From ca26e17273d99ce104223f9be3b91d0630a260fb Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 25 Jan 2026 11:47:55 +0000 Subject: [PATCH] fix: avoid node invoke deadlock in macOS gateway (#1752) (thanks @ngutman) --- CHANGELOG.md | 1 + .../Sources/ClawdbotKit/GatewayChannel.swift | 25 +-- .../ClawdbotKit/GatewayNodeSession.swift | 4 +- .../GatewayChannelTests.swift | 212 ++++++++++++++++++ 4 files changed, 216 insertions(+), 26 deletions(-) create mode 100644 apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayChannelTests.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index 64f296fd8..f9ee6b072 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.clawd.bot - Diagnostics: add diagnostic flags for targeted debug logs (config + env override). https://docs.clawd.bot/diagnostics/flags ### Fixes +- macOS: rearm gateway receive loop before push handling to avoid node invoke stalls. (#1752) Thanks @ngutman. - Gateway: include inline config env vars in service install environments. (#1735) Thanks @Seredeep. - BlueBubbles: route phone-number targets to DMs, avoid leaking routing IDs, and auto-create missing DMs (Private API required). (#1751) Thanks @tyler6204. https://docs.clawd.bot/channels/bluebubbles - BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing. diff --git a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayChannel.swift b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayChannel.swift index 819014cda..4dbcedcb9 100644 --- a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayChannel.swift +++ b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayChannel.swift @@ -427,8 +427,8 @@ public actor GatewayChannelActor { Task { await self.handleReceiveFailure(err) } case let .success(msg): Task { - await self.handle(msg) await self.listen() + await self.handle(msg) } } } @@ -619,29 +619,6 @@ public actor GatewayChannelActor { return Data() // Should not happen, but tolerate empty payloads. } - public func send(method: String, params: [String: AnyCodable]?) async throws { - try await self.connectOrThrow(context: "gateway connect") - let payload = try self.encodeRequest(method: method, params: params, kind: "send") - guard let task = self.task else { - throw NSError( - domain: "Gateway", - code: 5, - userInfo: [NSLocalizedDescriptionKey: "gateway socket unavailable"]) - } - do { - try await task.send(.data(payload.data)) - } catch { - let wrapped = self.wrap(error, context: "gateway send \(method)") - self.connected = false - self.task?.cancel(with: .goingAway, reason: nil) - Task { [weak self] in - guard let self else { return } - await self.scheduleReconnect() - } - throw wrapped - } - } - // Wrap low-level URLSession/WebSocket errors with context so UI can surface them. private func wrap(_ error: Error, context: String) -> Error { if let urlError = error as? URLError { diff --git a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift index 122231f02..a2ac2ad6d 100644 --- a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift +++ b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/GatewayNodeSession.swift @@ -143,7 +143,7 @@ public actor GatewayNodeSession { "payloadJSON": AnyCodable(payloadJSON ?? NSNull()), ] do { - try await channel.send(method: "node.event", params: params) + _ = try await channel.request(method: "node.event", params: params, timeoutMs: 8000) } catch { self.logger.error("node event failed: \(error.localizedDescription, privacy: .public)") } @@ -224,7 +224,7 @@ public actor GatewayNodeSession { ]) } do { - try await channel.send(method: "node.invoke.result", params: params) + _ = try await channel.request(method: "node.invoke.result", params: params, timeoutMs: 15000) } catch { self.logger.error("node invoke result failed: \(error.localizedDescription, privacy: .public)") } diff --git a/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayChannelTests.swift b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayChannelTests.swift new file mode 100644 index 000000000..c890664a7 --- /dev/null +++ b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/GatewayChannelTests.swift @@ -0,0 +1,212 @@ +import Foundation +import Testing +@testable import ClawdbotKit +import ClawdbotProtocol + +private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { + private let lock = NSLock() + private var queue: [URLSessionWebSocketTask.Message] = [] + private var pendingHandler: (@Sendable (Result) -> Void)? + private var pendingContinuation: CheckedContinuation? + private let encoder = JSONEncoder() + private let decoder = JSONDecoder() + + var state: URLSessionTask.State = .running + + func resume() {} + + func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + state = .canceling + } + + func send(_ message: URLSessionWebSocketTask.Message) async throws { + guard case let .data(data) = message else { return } + guard let frame = try? decoder.decode(RequestFrame.self, from: data) else { return } + switch frame.method { + case "connect": + enqueueResponse(id: frame.id, payload: helloOkPayload()) + default: + enqueueResponse(id: frame.id, payload: ["ok": true]) + } + } + + func receive() async throws -> URLSessionWebSocketTask.Message { + try await withCheckedThrowingContinuation { cont in + lock.lock() + if !queue.isEmpty { + let msg = queue.removeFirst() + lock.unlock() + cont.resume(returning: msg) + return + } + pendingContinuation = cont + lock.unlock() + } + } + + func receive( + completionHandler: @escaping @Sendable (Result) -> Void) + { + lock.lock() + if !queue.isEmpty { + let msg = queue.removeFirst() + lock.unlock() + completionHandler(.success(msg)) + return + } + pendingHandler = completionHandler + lock.unlock() + } + + func enqueue(_ message: URLSessionWebSocketTask.Message) { + lock.lock() + if let handler = pendingHandler { + pendingHandler = nil + lock.unlock() + handler(.success(message)) + return + } + if let continuation = pendingContinuation { + pendingContinuation = nil + lock.unlock() + continuation.resume(returning: message) + return + } + queue.append(message) + lock.unlock() + } + + private func enqueueResponse(id: String, payload: [String: Any]) { + let response = ResponseFrame( + type: "res", + id: id, + ok: true, + payload: ClawdbotProtocol.AnyCodable(payload), + error: nil) + guard let data = try? encoder.encode(response) else { return } + enqueue(.data(data)) + } + + private func helloOkPayload() -> [String: Any] { + [ + "type": "hello.ok", + "protocol": 1, + "server": [:], + "features": [:], + "snapshot": [ + "presence": [], + "health": [:], + "stateVersion": [ + "presence": 0, + "health": 0, + ], + "uptimeMs": 0, + ], + "policy": [ + "tickIntervalMs": 1000, + ], + ] + } +} + +private final class FakeWebSocketSession: WebSocketSessioning { + let task: FakeWebSocketTask + + init(task: FakeWebSocketTask) { + self.task = task + } + + func makeWebSocketTask(url: URL) -> WebSocketTaskBox { + WebSocketTaskBox(task: task) + } +} + +private actor AsyncSignal { + private var continuation: CheckedContinuation, Never>? + private var stored: Result? + + func finish(_ result: Result) { + if let continuation { + self.continuation = nil + continuation.resume(returning: result) + return + } + stored = result + } + + func wait() async throws { + let result = await withCheckedContinuation { cont in + if let stored { + self.stored = nil + cont.resume(returning: stored) + return + } + continuation = cont + } + switch result { + case .success: + return + case let .failure(error): + throw error + } + } +} + +private enum TestError: Error { + case timeout +} + +struct GatewayChannelTests { + @Test + func listenRearmsBeforePushHandler() async throws { + let task = FakeWebSocketTask() + let session = FakeWebSocketSession(task: task) + let signal = AsyncSignal() + let url = URL(string: "ws://example.invalid")! + final class ChannelBox { var channel: GatewayChannelActor? } + let box = ChannelBox() + + let channel = GatewayChannelActor( + url: url, + token: nil, + session: WebSocketSessionBox(session: session), + pushHandler: { push in + guard case let .event(evt) = push, evt.event == "test.event" else { return } + guard let channel = box.channel else { return } + let params: [String: ClawdbotKit.AnyCodable] = [ + "event": ClawdbotKit.AnyCodable("test"), + "payloadJSON": ClawdbotKit.AnyCodable(NSNull()), + ] + do { + _ = try await channel.request(method: "node.event", params: params, timeoutMs: 50) + await signal.finish(.success(())) + } catch { + await signal.finish(.failure(error)) + } + }) + box.channel = channel + + let challenge = EventFrame( + type: "event", + event: "connect.challenge", + payload: ClawdbotProtocol.AnyCodable(["nonce": "test-nonce"]), + seq: nil, + stateversion: nil) + let encoder = JSONEncoder() + task.enqueue(.data(try encoder.encode(challenge))) + + try await channel.connect() + + let event = EventFrame( + type: "event", + event: "test.event", + payload: ClawdbotProtocol.AnyCodable([:]), + seq: nil, + stateversion: nil) + task.enqueue(.data(try encoder.encode(event))) + + try await AsyncTimeout.withTimeout(seconds: 1, onTimeout: { TestError.timeout }) { + try await signal.wait() + } + } +}