This commit is contained in:
Nimrod Gutman 2026-01-29 21:53:31 -05:00 committed by GitHub
commit 06900d5a08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 32 additions and 5 deletions

View File

@ -367,7 +367,8 @@ actor GatewayConnection {
session: self.sessionBox, session: self.sessionBox,
pushHandler: { [weak self] push in pushHandler: { [weak self] push in
await self?.handle(push: push) await self?.handle(push: push)
}) },
loggerCategory: "gateway.control")
self.configuredURL = url self.configuredURL = url
self.configuredToken = token self.configuredToken = token
self.configuredPassword = password self.configuredPassword = password

View File

@ -109,7 +109,7 @@ private enum ConnectChallengeError: Error {
} }
public actor GatewayChannelActor { public actor GatewayChannelActor {
private let logger = Logger(subsystem: "bot.molt", category: "gateway") private let logger: Logger
private var task: WebSocketTaskBox? private var task: WebSocketTaskBox?
private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:] private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:]
private var connected = false private var connected = false
@ -143,8 +143,10 @@ public actor GatewayChannelActor {
session: WebSocketSessionBox? = nil, session: WebSocketSessionBox? = nil,
pushHandler: (@Sendable (GatewayPush) async -> Void)? = nil, pushHandler: (@Sendable (GatewayPush) async -> Void)? = nil,
connectOptions: GatewayConnectOptions? = nil, connectOptions: GatewayConnectOptions? = nil,
loggerCategory: String = "gateway",
disconnectHandler: (@Sendable (String) async -> Void)? = nil) disconnectHandler: (@Sendable (String) async -> Void)? = nil)
{ {
self.logger = Logger(subsystem: "bot.molt", category: loggerCategory)
self.url = url self.url = url
self.token = token self.token = token
self.password = password self.password = password
@ -211,7 +213,22 @@ public actor GatewayChannelActor {
} }
public func connect() async throws { public func connect() async throws {
if self.connected, self.task?.state == .running { return } if self.connected, self.task?.state == .running {
let staleStatus = self.staleConnectionStatus()
if staleStatus.isStale {
if let deltaMs = staleStatus.deltaMs {
self.logger.error(
"gateway ws stale; reconnecting deltaMs=\(Int(deltaMs)) thresholdMs=\(Int(staleStatus.thresholdMs))")
} else {
self.logger.error(
"gateway ws stale; reconnecting lastTick=missing thresholdMs=\(Int(staleStatus.thresholdMs))")
}
self.connected = false
self.task?.cancel(with: .goingAway, reason: nil)
} else {
return
}
}
if self.isConnecting { if self.isConnecting {
try await withCheckedThrowingContinuation { cont in try await withCheckedThrowingContinuation { cont in
self.connectWaiters.append(cont) self.connectWaiters.append(cont)
@ -328,8 +345,8 @@ public actor GatewayChannelActor {
} else if let password = self.password { } else if let password = self.password {
params["auth"] = ProtoAnyCodable(["password": ProtoAnyCodable(password)]) params["auth"] = ProtoAnyCodable(["password": ProtoAnyCodable(password)])
} }
let signedAtMs = Int(Date().timeIntervalSince1970 * 1000)
let connectNonce = try await self.waitForConnectChallenge() let connectNonce = try await self.waitForConnectChallenge()
let signedAtMs = Int(Date().timeIntervalSince1970 * 1000)
let scopesValue = scopes.joined(separator: ",") let scopesValue = scopes.joined(separator: ",")
var payloadParts = [ var payloadParts = [
connectNonce == nil ? "v1" : "v2", connectNonce == nil ? "v1" : "v2",
@ -554,6 +571,15 @@ public actor GatewayChannelActor {
} }
} }
private func staleConnectionStatus() -> (isStale: Bool, deltaMs: Double?, thresholdMs: Double) {
let thresholdMs = self.tickIntervalMs * 2
guard let lastTick else {
return (true, nil, thresholdMs)
}
let deltaMs = Date().timeIntervalSince(lastTick) * 1000
return (deltaMs > thresholdMs, deltaMs, thresholdMs)
}
private func scheduleReconnect() async { private func scheduleReconnect() async {
guard self.shouldReconnect else { return } guard self.shouldReconnect else { return }
let delay = self.backoffMs / 1000 let delay = self.backoffMs / 1000

View File

@ -90,6 +90,7 @@ public actor GatewayNodeSession {
await self?.handlePush(push) await self?.handlePush(push)
}, },
connectOptions: connectOptions, connectOptions: connectOptions,
loggerCategory: "gateway.node",
disconnectHandler: { [weak self] reason in disconnectHandler: { [weak self] reason in
await self?.onDisconnected?(reason) await self?.onDisconnected?(reason)
}) })
@ -107,7 +108,6 @@ public actor GatewayNodeSession {
do { do {
try await channel.connect() try await channel.connect()
await onConnected()
} catch { } catch {
await onDisconnected(error.localizedDescription) await onDisconnected(error.localizedDescription)
throw error throw error