Compare commits
3 Commits
main
...
fix/node-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca26e17273 | ||
|
|
34ab1d245c | ||
|
|
a11b98f801 |
@ -26,6 +26,7 @@ Docs: https://docs.clawd.bot
|
|||||||
- Diagnostics: add diagnostic flags for targeted debug logs (config + env override). https://docs.clawd.bot/diagnostics/flags
|
- Diagnostics: add diagnostic flags for targeted debug logs (config + env override). https://docs.clawd.bot/diagnostics/flags
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
- macOS: rearm gateway receive loop before push handling to avoid node invoke stalls. (#1752) Thanks @ngutman.
|
||||||
- Gateway: include inline config env vars in service install environments. (#1735) Thanks @Seredeep.
|
- Gateway: include inline config env vars in service install environments. (#1735) Thanks @Seredeep.
|
||||||
- BlueBubbles: route phone-number targets to DMs, avoid leaking routing IDs, and auto-create missing DMs (Private API required). (#1751) Thanks @tyler6204. https://docs.clawd.bot/channels/bluebubbles
|
- BlueBubbles: route phone-number targets to DMs, avoid leaking routing IDs, and auto-create missing DMs (Private API required). (#1751) Thanks @tyler6204. https://docs.clawd.bot/channels/bluebubbles
|
||||||
- BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing.
|
- BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing.
|
||||||
|
|||||||
@ -427,8 +427,8 @@ public actor GatewayChannelActor {
|
|||||||
Task { await self.handleReceiveFailure(err) }
|
Task { await self.handleReceiveFailure(err) }
|
||||||
case let .success(msg):
|
case let .success(msg):
|
||||||
Task {
|
Task {
|
||||||
await self.handle(msg)
|
|
||||||
await self.listen()
|
await self.listen()
|
||||||
|
await self.handle(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -574,46 +574,22 @@ public actor GatewayChannelActor {
|
|||||||
params: [String: AnyCodable]?,
|
params: [String: AnyCodable]?,
|
||||||
timeoutMs: Double? = nil) async throws -> Data
|
timeoutMs: Double? = nil) async throws -> Data
|
||||||
{
|
{
|
||||||
do {
|
try await self.connectOrThrow(context: "gateway connect")
|
||||||
try await self.connect()
|
|
||||||
} catch {
|
|
||||||
throw self.wrap(error, context: "gateway connect")
|
|
||||||
}
|
|
||||||
let id = UUID().uuidString
|
|
||||||
let effectiveTimeout = timeoutMs ?? self.defaultRequestTimeoutMs
|
let effectiveTimeout = timeoutMs ?? self.defaultRequestTimeoutMs
|
||||||
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
let payload = try self.encodeRequest(method: method, params: params, kind: "request")
|
||||||
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
|
||||||
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
|
||||||
dict[entry.key] = ProtoAnyCodable(entry.value.value)
|
|
||||||
}
|
|
||||||
return ProtoAnyCodable(dict)
|
|
||||||
}
|
|
||||||
let frame = RequestFrame(
|
|
||||||
type: "req",
|
|
||||||
id: id,
|
|
||||||
method: method,
|
|
||||||
params: paramsObject)
|
|
||||||
let data: Data
|
|
||||||
do {
|
|
||||||
data = try self.encoder.encode(frame)
|
|
||||||
} catch {
|
|
||||||
self.logger.error(
|
|
||||||
"gateway request encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
|
|
||||||
throw error
|
|
||||||
}
|
|
||||||
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
||||||
self.pending[id] = cont
|
self.pending[payload.id] = cont
|
||||||
Task { [weak self] in
|
Task { [weak self] in
|
||||||
guard let self else { return }
|
guard let self else { return }
|
||||||
try? await Task.sleep(nanoseconds: UInt64(effectiveTimeout * 1_000_000))
|
try? await Task.sleep(nanoseconds: UInt64(effectiveTimeout * 1_000_000))
|
||||||
await self.timeoutRequest(id: id, timeoutMs: effectiveTimeout)
|
await self.timeoutRequest(id: payload.id, timeoutMs: effectiveTimeout)
|
||||||
}
|
}
|
||||||
Task {
|
Task {
|
||||||
do {
|
do {
|
||||||
try await self.task?.send(.data(data))
|
try await self.task?.send(.data(payload.data))
|
||||||
} catch {
|
} catch {
|
||||||
let wrapped = self.wrap(error, context: "gateway send \(method)")
|
let wrapped = self.wrap(error, context: "gateway send \(method)")
|
||||||
let waiter = self.pending.removeValue(forKey: id)
|
let waiter = self.pending.removeValue(forKey: payload.id)
|
||||||
// Treat send failures as a broken socket: mark disconnected and trigger reconnect.
|
// Treat send failures as a broken socket: mark disconnected and trigger reconnect.
|
||||||
self.connected = false
|
self.connected = false
|
||||||
self.task?.cancel(with: .goingAway, reason: nil)
|
self.task?.cancel(with: .goingAway, reason: nil)
|
||||||
@ -657,6 +633,42 @@ public actor GatewayChannelActor {
|
|||||||
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
|
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func connectOrThrow(context: String) async throws {
|
||||||
|
do {
|
||||||
|
try await self.connect()
|
||||||
|
} catch {
|
||||||
|
throw self.wrap(error, context: context)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func encodeRequest(
|
||||||
|
method: String,
|
||||||
|
params: [String: AnyCodable]?,
|
||||||
|
kind: String) throws -> (id: String, data: Data)
|
||||||
|
{
|
||||||
|
let id = UUID().uuidString
|
||||||
|
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
||||||
|
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
||||||
|
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
||||||
|
dict[entry.key] = ProtoAnyCodable(entry.value.value)
|
||||||
|
}
|
||||||
|
return ProtoAnyCodable(dict)
|
||||||
|
}
|
||||||
|
let frame = RequestFrame(
|
||||||
|
type: "req",
|
||||||
|
id: id,
|
||||||
|
method: method,
|
||||||
|
params: paramsObject)
|
||||||
|
do {
|
||||||
|
let data = try self.encoder.encode(frame)
|
||||||
|
return (id: id, data: data)
|
||||||
|
} catch {
|
||||||
|
self.logger.error(
|
||||||
|
"gateway \(kind) encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func failPending(_ error: Error) async {
|
private func failPending(_ error: Error) async {
|
||||||
let waiters = self.pending
|
let waiters = self.pending
|
||||||
self.pending.removeAll()
|
self.pending.removeAll()
|
||||||
|
|||||||
@ -0,0 +1,212 @@
|
|||||||
|
import Foundation
|
||||||
|
import Testing
|
||||||
|
@testable import ClawdbotKit
|
||||||
|
import ClawdbotProtocol
|
||||||
|
|
||||||
|
private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable {
|
||||||
|
private let lock = NSLock()
|
||||||
|
private var queue: [URLSessionWebSocketTask.Message] = []
|
||||||
|
private var pendingHandler: (@Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)?
|
||||||
|
private var pendingContinuation: CheckedContinuation<URLSessionWebSocketTask.Message, Error>?
|
||||||
|
private let encoder = JSONEncoder()
|
||||||
|
private let decoder = JSONDecoder()
|
||||||
|
|
||||||
|
var state: URLSessionTask.State = .running
|
||||||
|
|
||||||
|
func resume() {}
|
||||||
|
|
||||||
|
func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
|
||||||
|
state = .canceling
|
||||||
|
}
|
||||||
|
|
||||||
|
func send(_ message: URLSessionWebSocketTask.Message) async throws {
|
||||||
|
guard case let .data(data) = message else { return }
|
||||||
|
guard let frame = try? decoder.decode(RequestFrame.self, from: data) else { return }
|
||||||
|
switch frame.method {
|
||||||
|
case "connect":
|
||||||
|
enqueueResponse(id: frame.id, payload: helloOkPayload())
|
||||||
|
default:
|
||||||
|
enqueueResponse(id: frame.id, payload: ["ok": true])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func receive() async throws -> URLSessionWebSocketTask.Message {
|
||||||
|
try await withCheckedThrowingContinuation { cont in
|
||||||
|
lock.lock()
|
||||||
|
if !queue.isEmpty {
|
||||||
|
let msg = queue.removeFirst()
|
||||||
|
lock.unlock()
|
||||||
|
cont.resume(returning: msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pendingContinuation = cont
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func receive(
|
||||||
|
completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, Error>) -> Void)
|
||||||
|
{
|
||||||
|
lock.lock()
|
||||||
|
if !queue.isEmpty {
|
||||||
|
let msg = queue.removeFirst()
|
||||||
|
lock.unlock()
|
||||||
|
completionHandler(.success(msg))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pendingHandler = completionHandler
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func enqueue(_ message: URLSessionWebSocketTask.Message) {
|
||||||
|
lock.lock()
|
||||||
|
if let handler = pendingHandler {
|
||||||
|
pendingHandler = nil
|
||||||
|
lock.unlock()
|
||||||
|
handler(.success(message))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if let continuation = pendingContinuation {
|
||||||
|
pendingContinuation = nil
|
||||||
|
lock.unlock()
|
||||||
|
continuation.resume(returning: message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
queue.append(message)
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func enqueueResponse(id: String, payload: [String: Any]) {
|
||||||
|
let response = ResponseFrame(
|
||||||
|
type: "res",
|
||||||
|
id: id,
|
||||||
|
ok: true,
|
||||||
|
payload: ClawdbotProtocol.AnyCodable(payload),
|
||||||
|
error: nil)
|
||||||
|
guard let data = try? encoder.encode(response) else { return }
|
||||||
|
enqueue(.data(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
private func helloOkPayload() -> [String: Any] {
|
||||||
|
[
|
||||||
|
"type": "hello.ok",
|
||||||
|
"protocol": 1,
|
||||||
|
"server": [:],
|
||||||
|
"features": [:],
|
||||||
|
"snapshot": [
|
||||||
|
"presence": [],
|
||||||
|
"health": [:],
|
||||||
|
"stateVersion": [
|
||||||
|
"presence": 0,
|
||||||
|
"health": 0,
|
||||||
|
],
|
||||||
|
"uptimeMs": 0,
|
||||||
|
],
|
||||||
|
"policy": [
|
||||||
|
"tickIntervalMs": 1000,
|
||||||
|
],
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class FakeWebSocketSession: WebSocketSessioning {
|
||||||
|
let task: FakeWebSocketTask
|
||||||
|
|
||||||
|
init(task: FakeWebSocketTask) {
|
||||||
|
self.task = task
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
|
||||||
|
WebSocketTaskBox(task: task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private actor AsyncSignal {
|
||||||
|
private var continuation: CheckedContinuation<Result<Void, Error>, Never>?
|
||||||
|
private var stored: Result<Void, Error>?
|
||||||
|
|
||||||
|
func finish(_ result: Result<Void, Error>) {
|
||||||
|
if let continuation {
|
||||||
|
self.continuation = nil
|
||||||
|
continuation.resume(returning: result)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stored = result
|
||||||
|
}
|
||||||
|
|
||||||
|
func wait() async throws {
|
||||||
|
let result = await withCheckedContinuation { cont in
|
||||||
|
if let stored {
|
||||||
|
self.stored = nil
|
||||||
|
cont.resume(returning: stored)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continuation = cont
|
||||||
|
}
|
||||||
|
switch result {
|
||||||
|
case .success:
|
||||||
|
return
|
||||||
|
case let .failure(error):
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum TestError: Error {
|
||||||
|
case timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GatewayChannelTests {
|
||||||
|
@Test
|
||||||
|
func listenRearmsBeforePushHandler() async throws {
|
||||||
|
let task = FakeWebSocketTask()
|
||||||
|
let session = FakeWebSocketSession(task: task)
|
||||||
|
let signal = AsyncSignal()
|
||||||
|
let url = URL(string: "ws://example.invalid")!
|
||||||
|
final class ChannelBox { var channel: GatewayChannelActor? }
|
||||||
|
let box = ChannelBox()
|
||||||
|
|
||||||
|
let channel = GatewayChannelActor(
|
||||||
|
url: url,
|
||||||
|
token: nil,
|
||||||
|
session: WebSocketSessionBox(session: session),
|
||||||
|
pushHandler: { push in
|
||||||
|
guard case let .event(evt) = push, evt.event == "test.event" else { return }
|
||||||
|
guard let channel = box.channel else { return }
|
||||||
|
let params: [String: ClawdbotKit.AnyCodable] = [
|
||||||
|
"event": ClawdbotKit.AnyCodable("test"),
|
||||||
|
"payloadJSON": ClawdbotKit.AnyCodable(NSNull()),
|
||||||
|
]
|
||||||
|
do {
|
||||||
|
_ = try await channel.request(method: "node.event", params: params, timeoutMs: 50)
|
||||||
|
await signal.finish(.success(()))
|
||||||
|
} catch {
|
||||||
|
await signal.finish(.failure(error))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
box.channel = channel
|
||||||
|
|
||||||
|
let challenge = EventFrame(
|
||||||
|
type: "event",
|
||||||
|
event: "connect.challenge",
|
||||||
|
payload: ClawdbotProtocol.AnyCodable(["nonce": "test-nonce"]),
|
||||||
|
seq: nil,
|
||||||
|
stateversion: nil)
|
||||||
|
let encoder = JSONEncoder()
|
||||||
|
task.enqueue(.data(try encoder.encode(challenge)))
|
||||||
|
|
||||||
|
try await channel.connect()
|
||||||
|
|
||||||
|
let event = EventFrame(
|
||||||
|
type: "event",
|
||||||
|
event: "test.event",
|
||||||
|
payload: ClawdbotProtocol.AnyCodable([:]),
|
||||||
|
seq: nil,
|
||||||
|
stateversion: nil)
|
||||||
|
task.enqueue(.data(try encoder.encode(event)))
|
||||||
|
|
||||||
|
try await AsyncTimeout.withTimeout(seconds: 1, onTimeout: { TestError.timeout }) {
|
||||||
|
try await signal.wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user