Compare commits

...

3 Commits

Author SHA1 Message Date
Peter Steinberger
ca26e17273 fix: avoid node invoke deadlock in macOS gateway (#1752) (thanks @ngutman) 2026-01-25 11:47:55 +00:00
Nimrod Gutman
34ab1d245c refactor(gateway): share request encoding 2026-01-25 11:35:00 +00:00
Nimrod Gutman
a11b98f801 fix(node): avoid invoke result deadlock 2026-01-25 11:35:00 +00:00
3 changed files with 256 additions and 31 deletions

View File

@ -26,6 +26,7 @@ Docs: https://docs.clawd.bot
- Diagnostics: add diagnostic flags for targeted debug logs (config + env override). https://docs.clawd.bot/diagnostics/flags
### Fixes
- macOS: rearm gateway receive loop before push handling to avoid node invoke stalls. (#1752) Thanks @ngutman.
- Gateway: include inline config env vars in service install environments. (#1735) Thanks @Seredeep.
- BlueBubbles: route phone-number targets to DMs, avoid leaking routing IDs, and auto-create missing DMs (Private API required). (#1751) Thanks @tyler6204. https://docs.clawd.bot/channels/bluebubbles
- BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing.

View File

@ -427,8 +427,8 @@ public actor GatewayChannelActor {
Task { await self.handleReceiveFailure(err) }
case let .success(msg):
Task {
await self.handle(msg)
await self.listen()
await self.handle(msg)
}
}
}
@ -574,46 +574,22 @@ public actor GatewayChannelActor {
params: [String: AnyCodable]?,
timeoutMs: Double? = nil) async throws -> Data
{
do {
try await self.connect()
} catch {
throw self.wrap(error, context: "gateway connect")
}
let id = UUID().uuidString
try await self.connectOrThrow(context: "gateway connect")
let effectiveTimeout = timeoutMs ?? self.defaultRequestTimeoutMs
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
let paramsObject: ProtoAnyCodable? = params.map { entries in
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
dict[entry.key] = ProtoAnyCodable(entry.value.value)
}
return ProtoAnyCodable(dict)
}
let frame = RequestFrame(
type: "req",
id: id,
method: method,
params: paramsObject)
let data: Data
do {
data = try self.encoder.encode(frame)
} catch {
self.logger.error(
"gateway request encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
throw error
}
let payload = try self.encodeRequest(method: method, params: params, kind: "request")
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
self.pending[id] = cont
self.pending[payload.id] = cont
Task { [weak self] in
guard let self else { return }
try? await Task.sleep(nanoseconds: UInt64(effectiveTimeout * 1_000_000))
await self.timeoutRequest(id: id, timeoutMs: effectiveTimeout)
await self.timeoutRequest(id: payload.id, timeoutMs: effectiveTimeout)
}
Task {
do {
try await self.task?.send(.data(data))
try await self.task?.send(.data(payload.data))
} catch {
let wrapped = self.wrap(error, context: "gateway send \(method)")
let waiter = self.pending.removeValue(forKey: id)
let waiter = self.pending.removeValue(forKey: payload.id)
// Treat send failures as a broken socket: mark disconnected and trigger reconnect.
self.connected = false
self.task?.cancel(with: .goingAway, reason: nil)
@ -657,6 +633,42 @@ public actor GatewayChannelActor {
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
}
private func connectOrThrow(context: String) async throws {
do {
try await self.connect()
} catch {
throw self.wrap(error, context: context)
}
}
private func encodeRequest(
method: String,
params: [String: AnyCodable]?,
kind: String) throws -> (id: String, data: Data)
{
let id = UUID().uuidString
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
let paramsObject: ProtoAnyCodable? = params.map { entries in
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
dict[entry.key] = ProtoAnyCodable(entry.value.value)
}
return ProtoAnyCodable(dict)
}
let frame = RequestFrame(
type: "req",
id: id,
method: method,
params: paramsObject)
do {
let data = try self.encoder.encode(frame)
return (id: id, data: data)
} catch {
self.logger.error(
"gateway \(kind) encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
throw error
}
}
private func failPending(_ error: Error) async {
let waiters = self.pending
self.pending.removeAll()

View File

@ -0,0 +1,212 @@
import Foundation
import Testing
@testable import ClawdbotKit
import ClawdbotProtocol
private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable {
private let lock = NSLock()
private var queue: [URLSessionWebSocketTask.Message] = []
private var pendingHandler: (@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)?
private var pendingContinuation: CheckedContinuation<URLSessionWebSocketTask.Message, Error>?
private let encoder = JSONEncoder()
private let decoder = JSONDecoder()
var state: URLSessionTask.State = .running
func resume() {}
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
state = .canceling
}
func send(_ message: URLSessionWebSocketTask.Message) async throws {
guard case let .data(data) = message else { return }
guard let frame = try? decoder.decode(RequestFrame.self, from: data) else { return }
switch frame.method {
case "connect":
enqueueResponse(id: frame.id, payload: helloOkPayload())
default:
enqueueResponse(id: frame.id, payload: ["ok": true])
}
}
func receive() async throws -> URLSessionWebSocketTask.Message {
try await withCheckedThrowingContinuation { cont in
lock.lock()
if !queue.isEmpty {
let msg = queue.removeFirst()
lock.unlock()
cont.resume(returning: msg)
return
}
pendingContinuation = cont
lock.unlock()
}
}
func receive(
completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)
{
lock.lock()
if !queue.isEmpty {
let msg = queue.removeFirst()
lock.unlock()
completionHandler(.success(msg))
return
}
pendingHandler = completionHandler
lock.unlock()
}
func enqueue(_ message: URLSessionWebSocketTask.Message) {
lock.lock()
if let handler = pendingHandler {
pendingHandler = nil
lock.unlock()
handler(.success(message))
return
}
if let continuation = pendingContinuation {
pendingContinuation = nil
lock.unlock()
continuation.resume(returning: message)
return
}
queue.append(message)
lock.unlock()
}
private func enqueueResponse(id: String, payload: [String: Any]) {
let response = ResponseFrame(
type: "res",
id: id,
ok: true,
payload: ClawdbotProtocol.AnyCodable(payload),
error: nil)
guard let data = try? encoder.encode(response) else { return }
enqueue(.data(data))
}
private func helloOkPayload() -> [String: Any] {
[
"type": "hello.ok",
"protocol": 1,
"server": [:],
"features": [:],
"snapshot": [
"presence": [],
"health": [:],
"stateVersion": [
"presence": 0,
"health": 0,
],
"uptimeMs": 0,
],
"policy": [
"tickIntervalMs": 1000,
],
]
}
}
private final class FakeWebSocketSession: WebSocketSessioning {
let task: FakeWebSocketTask
init(task: FakeWebSocketTask) {
self.task = task
}
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
WebSocketTaskBox(task: task)
}
}
private actor AsyncSignal {
private var continuation: CheckedContinuation<Result<Void, Error>, Never>?
private var stored: Result<Void, Error>?
func finish(_ result: Result<Void, Error>) {
if let continuation {
self.continuation = nil
continuation.resume(returning: result)
return
}
stored = result
}
func wait() async throws {
let result = await withCheckedContinuation { cont in
if let stored {
self.stored = nil
cont.resume(returning: stored)
return
}
continuation = cont
}
switch result {
case .success:
return
case let .failure(error):
throw error
}
}
}
private enum TestError: Error {
case timeout
}
struct GatewayChannelTests {
@Test
func listenRearmsBeforePushHandler() async throws {
let task = FakeWebSocketTask()
let session = FakeWebSocketSession(task: task)
let signal = AsyncSignal()
let url = URL(string: "ws://example.invalid")!
final class ChannelBox { var channel: GatewayChannelActor? }
let box = ChannelBox()
let channel = GatewayChannelActor(
url: url,
token: nil,
session: WebSocketSessionBox(session: session),
pushHandler: { push in
guard case let .event(evt) = push, evt.event == "test.event" else { return }
guard let channel = box.channel else { return }
let params: [String: ClawdbotKit.AnyCodable] = [
"event": ClawdbotKit.AnyCodable("test"),
"payloadJSON": ClawdbotKit.AnyCodable(NSNull()),
]
do {
_ = try await channel.request(method: "node.event", params: params, timeoutMs: 50)
await signal.finish(.success(()))
} catch {
await signal.finish(.failure(error))
}
})
box.channel = channel
let challenge = EventFrame(
type: "event",
event: "connect.challenge",
payload: ClawdbotProtocol.AnyCodable(["nonce": "test-nonce"]),
seq: nil,
stateversion: nil)
let encoder = JSONEncoder()
task.enqueue(.data(try encoder.encode(challenge)))
try await channel.connect()
let event = EventFrame(
type: "event",
event: "test.event",
payload: ClawdbotProtocol.AnyCodable([:]),
seq: nil,
stateversion: nil)
task.enqueue(.data(try encoder.encode(event)))
try await AsyncTimeout.withTimeout(seconds: 1, onTimeout: { TestError.timeout }) {
try await signal.wait()
}
}
}