feat: Add enhanced rate limiting system with circuit breaker pattern
This commit is contained in:
parent
c41ea252b0
commit
dea49316f7
13
CHANGELOG.md
13
CHANGELOG.md
@ -2,6 +2,19 @@
|
|||||||
|
|
||||||
Docs: https://docs.molt.bot
|
Docs: https://docs.molt.bot
|
||||||
|
|
||||||
|
## Unreleased
|
||||||
|
|
||||||
|
### New Features
|
||||||
|
- **Rate Limiting & Circuit Breakers**: Add proactive rate limiting system with circuit breaker pattern to prevent rate limit errors across all messaging channels. Includes sliding window + token bucket algorithms, per-channel/account isolation, comprehensive metrics, and automatic integration with Discord and Telegram. See [Rate Limiting docs](https://docs.molt.bot/gateway/rate-limiting) for configuration and monitoring.
|
||||||
|
|
||||||
|
### Changes
|
||||||
|
- Infra: add enhanced rate limiting system with sliding window and token bucket algorithms (`src/infra/rate-limiter.ts`).
|
||||||
|
- Infra: add circuit breaker pattern for fault tolerance and graceful degradation.
|
||||||
|
- Discord: add rate-limited API wrapper with per-guild/DM/global limits (`src/discord/rate-limited-api.ts`).
|
||||||
|
- Telegram: add rate-limited API wrapper with per-chat/DM/global limits (`src/telegram/rate-limited-api.ts`).
|
||||||
|
- Infra: add global rate limiter registry for centralized management and metrics aggregation.
|
||||||
|
- Docs: add comprehensive rate limiting documentation with configuration examples, monitoring guide, and troubleshooting (`docs/gateway/rate-limiting.md`).
|
||||||
|
|
||||||
## 2026.1.27-beta.1
|
## 2026.1.27-beta.1
|
||||||
Status: beta.
|
Status: beta.
|
||||||
|
|
||||||
|
|||||||
377
docs/gateway/rate-limiting.md
Normal file
377
docs/gateway/rate-limiting.md
Normal file
@ -0,0 +1,377 @@
|
|||||||
|
---
|
||||||
|
title: "Rate Limiting & Circuit Breakers"
|
||||||
|
summary: "Enhanced rate limiting system with circuit breaker pattern for reliable messaging across all channels"
|
||||||
|
read_when:
|
||||||
|
- Experiencing rate limit errors from messaging providers
|
||||||
|
- Setting up high-volume bot deployments
|
||||||
|
- Monitoring channel health and throughput
|
||||||
|
- Optimizing message delivery performance
|
||||||
|
---
|
||||||
|
|
||||||
|
# Rate Limiting & Circuit Breakers
|
||||||
|
|
||||||
|
Moltbot includes an intelligent rate limiting system that prevents rate limit errors **proactively** while maintaining high throughput across all messaging channels.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The rate limiting system provides:
|
||||||
|
|
||||||
|
- **Sliding Window Algorithm** with token bucket pattern for smooth rate limiting
|
||||||
|
- **Circuit Breaker Pattern** for fault tolerance and graceful degradation
|
||||||
|
- **Per-channel, Per-account Isolation** to prevent one channel from affecting others
|
||||||
|
- **Comprehensive Metrics** for monitoring and debugging
|
||||||
|
- **Adaptive Backoff** that learns from provider responses
|
||||||
|
|
||||||
|
## How It Works
|
||||||
|
|
||||||
|
### Sliding Window + Token Bucket
|
||||||
|
|
||||||
|
The system combines two proven algorithms:
|
||||||
|
|
||||||
|
1. **Token Bucket**: Allows bursts up to a configured size, then refills tokens at a steady rate
|
||||||
|
2. **Sliding Window**: Tracks requests over time to enforce hard limits per time window
|
||||||
|
|
||||||
|
This hybrid approach provides both **burst handling** (for quick replies) and **sustained throughput** (for long conversations).
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Example: 10 requests per second with burst of 20
|
||||||
|
{
|
||||||
|
maxRequests: 10,
|
||||||
|
windowMs: 1000,
|
||||||
|
burstSize: 20,
|
||||||
|
refillRate: 10
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Circuit Breaker Pattern
|
||||||
|
|
||||||
|
When a channel experiences repeated failures (network errors, API errors, etc.), the circuit breaker:
|
||||||
|
|
||||||
|
1. **Opens** after N failures → reject new requests immediately
|
||||||
|
2. **Half-opens** after timeout → test with limited requests
|
||||||
|
3. **Closes** after successes → resume normal operation
|
||||||
|
|
||||||
|
This prevents cascading failures and gives providers time to recover.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### Per-Channel Rate Limits
|
||||||
|
|
||||||
|
Rate limits are configured per channel in `config.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
channels:
|
||||||
|
discord:
|
||||||
|
accounts:
|
||||||
|
default:
|
||||||
|
rateLimit:
|
||||||
|
# Global account limit
|
||||||
|
global:
|
||||||
|
requestsPerSecond: 50
|
||||||
|
burstSize: 100
|
||||||
|
# DM-specific limits
|
||||||
|
dm:
|
||||||
|
requestsPerSecond: 5
|
||||||
|
burstSize: 10
|
||||||
|
# Guild-specific limits
|
||||||
|
guild:
|
||||||
|
requestsPerSecond: 1
|
||||||
|
burstSize: 10
|
||||||
|
circuitBreaker:
|
||||||
|
failureThreshold: 5
|
||||||
|
resetTimeoutMs: 30000
|
||||||
|
successThreshold: 2
|
||||||
|
|
||||||
|
telegram:
|
||||||
|
accounts:
|
||||||
|
default:
|
||||||
|
rateLimit:
|
||||||
|
global:
|
||||||
|
requestsPerSecond: 30
|
||||||
|
burstSize: 60
|
||||||
|
chat:
|
||||||
|
requestsPerSecond: 1
|
||||||
|
burstSize: 3
|
||||||
|
dm:
|
||||||
|
requestsPerSecond: 1
|
||||||
|
burstSize: 10
|
||||||
|
circuitBreaker:
|
||||||
|
failureThreshold: 3
|
||||||
|
resetTimeoutMs: 30000
|
||||||
|
successThreshold: 2
|
||||||
|
```
|
||||||
|
|
||||||
|
### Global Defaults
|
||||||
|
|
||||||
|
If not configured, channels use sensible defaults based on provider documentation:
|
||||||
|
|
||||||
|
| Provider | Global RPS | Burst | Circuit Breaker |
|
||||||
|
|----------|-----------|-------|-----------------|
|
||||||
|
| Discord | 50 | 100 | Yes (5/30s/2) |
|
||||||
|
| Telegram | 30 | 60 | Yes (3/30s/2) |
|
||||||
|
| Slack | 1 | 5 | Yes (5/30s/2) |
|
||||||
|
| WhatsApp | 20 | 40 | Yes (3/30s/2) |
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### Automatic (Recommended)
|
||||||
|
|
||||||
|
Rate limiting is **automatic** for all channel providers. No code changes needed.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Sends respect rate limits automatically
|
||||||
|
await sendMessageDiscord("channel:123", "Hello!");
|
||||||
|
await sendMessageTelegram("123456", "Hi there!");
|
||||||
|
```
|
||||||
|
|
||||||
|
### Manual Integration
|
||||||
|
|
||||||
|
For custom integrations, use the rate limiter directly:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { RateLimiter, createProviderRateLimiterOptions } from "moltbot/infra/rate-limiter";
|
||||||
|
|
||||||
|
const limiter = new RateLimiter(
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "custom",
|
||||||
|
accountId: "my-bot",
|
||||||
|
requestsPerSecond: 10,
|
||||||
|
burstSize: 20,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Try to acquire permission
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
if (!result.allowed) {
|
||||||
|
console.log(`Rate limited. Retry after ${result.retryAfter}ms`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make API call
|
||||||
|
try {
|
||||||
|
await myApiCall();
|
||||||
|
limiter.recordSuccess();
|
||||||
|
} catch (err) {
|
||||||
|
limiter.recordFailure();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Wait-based Approach
|
||||||
|
|
||||||
|
For less time-sensitive operations, use `waitAndAcquire`:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Automatically waits for rate limit to clear (up to 30s)
|
||||||
|
const acquired = await limiter.waitAndAcquire(30_000);
|
||||||
|
if (acquired) {
|
||||||
|
await myApiCall();
|
||||||
|
} else {
|
||||||
|
throw new Error("Rate limit timeout");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Monitoring
|
||||||
|
|
||||||
|
### CLI Status Command
|
||||||
|
|
||||||
|
Check rate limiting status across all channels:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
moltbot channels status --rate-limits
|
||||||
|
```
|
||||||
|
|
||||||
|
Output:
|
||||||
|
```
|
||||||
|
Rate Limiting Status
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
|
||||||
|
Discord (default)
|
||||||
|
Global: 45/50 available (circuit: closed)
|
||||||
|
Guild #123: 8/10 available (circuit: closed)
|
||||||
|
DM @user456: 4/5 available (circuit: closed)
|
||||||
|
|
||||||
|
Telegram (default)
|
||||||
|
Global: 28/30 available (circuit: closed)
|
||||||
|
Chat -100...: 0/1 available (circuit: open ⚠️)
|
||||||
|
└─ Retry in: 850ms
|
||||||
|
|
||||||
|
Overall Stats:
|
||||||
|
Total Requests: 1,247
|
||||||
|
Allowed: 1,198 (96%)
|
||||||
|
Rejected: 49 (4%)
|
||||||
|
Circuit Blocks: 12 (1%)
|
||||||
|
Avg Wait: 420ms
|
||||||
|
```
|
||||||
|
|
||||||
|
### Programmatic Access
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { rateLimiterRegistry } from "moltbot/infra/rate-limiter";
|
||||||
|
|
||||||
|
// Get aggregated metrics
|
||||||
|
const metrics = rateLimiterRegistry.getAggregatedMetrics();
|
||||||
|
console.log(`Total limiters: ${metrics.totalLimiters}`);
|
||||||
|
console.log(`Success rate: ${metrics.metrics.allowedRequests / metrics.metrics.totalRequests}`);
|
||||||
|
|
||||||
|
// Get specific limiter
|
||||||
|
const limiter = rateLimiterRegistry.get("discord:default:global");
|
||||||
|
const state = limiter?.getState();
|
||||||
|
console.log(`Tokens: ${state.limiter.tokens}`);
|
||||||
|
console.log(`Circuit: ${state.circuit?.state}`);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Logging
|
||||||
|
|
||||||
|
Enable verbose rate limiting logs:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
logging:
|
||||||
|
subsystems:
|
||||||
|
"rate-limiter": "debug"
|
||||||
|
"discord/rate-limited": "debug"
|
||||||
|
"telegram/rate-limited": "debug"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Best Practices
|
||||||
|
|
||||||
|
### 1. Configure Based on Your Usage
|
||||||
|
|
||||||
|
High-volume bots should tune rate limits:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
channels:
|
||||||
|
discord:
|
||||||
|
accounts:
|
||||||
|
high-volume:
|
||||||
|
rateLimit:
|
||||||
|
global:
|
||||||
|
requestsPerSecond: 40 # Leave headroom
|
||||||
|
burstSize: 80
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Monitor Circuit Breaker State
|
||||||
|
|
||||||
|
Circuit breakers opening frequently indicate:
|
||||||
|
- Provider issues (outages, degraded performance)
|
||||||
|
- Network problems
|
||||||
|
- Misconfigured rate limits (too aggressive)
|
||||||
|
|
||||||
|
### 3. Use DM-Specific Limits
|
||||||
|
|
||||||
|
DMs often have different limits than group chats:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
channels:
|
||||||
|
telegram:
|
||||||
|
accounts:
|
||||||
|
default:
|
||||||
|
rateLimit:
|
||||||
|
dm:
|
||||||
|
requestsPerSecond: 1
|
||||||
|
burstSize: 15 # Higher burst for DMs
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. Tune Circuit Breaker Thresholds
|
||||||
|
|
||||||
|
For flaky networks:
|
||||||
|
```yaml
|
||||||
|
circuitBreaker:
|
||||||
|
failureThreshold: 10 # More tolerant
|
||||||
|
resetTimeoutMs: 60000 # Longer recovery
|
||||||
|
```
|
||||||
|
|
||||||
|
For critical services:
|
||||||
|
```yaml
|
||||||
|
circuitBreaker:
|
||||||
|
failureThreshold: 3 # Fail fast
|
||||||
|
resetTimeoutMs: 15000 # Quick recovery
|
||||||
|
```
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### "Circuit breaker is open" Errors
|
||||||
|
|
||||||
|
**Cause**: Too many consecutive failures
|
||||||
|
|
||||||
|
**Solution**:
|
||||||
|
1. Check channel status: `moltbot channels status --probe`
|
||||||
|
2. Review logs for underlying errors
|
||||||
|
3. Wait for auto-recovery or manually reset
|
||||||
|
4. Increase `failureThreshold` if network is flaky
|
||||||
|
|
||||||
|
### High Rejection Rate
|
||||||
|
|
||||||
|
**Cause**: Traffic exceeds configured limits
|
||||||
|
|
||||||
|
**Solution**:
|
||||||
|
1. Increase `requestsPerSecond` if provider allows
|
||||||
|
2. Increase `burstSize` for temporary spikes
|
||||||
|
3. Implement message queuing in your application
|
||||||
|
4. Consider multiple bot accounts for load distribution
|
||||||
|
|
||||||
|
### Metrics Show Zero Requests
|
||||||
|
|
||||||
|
**Cause**: Rate limiter not initialized for channel
|
||||||
|
|
||||||
|
**Solution**:
|
||||||
|
1. Verify channel is active: `moltbot channels status`
|
||||||
|
2. Send a test message to initialize limiter
|
||||||
|
3. Check configuration syntax in `config.yaml`
|
||||||
|
|
||||||
|
## API Reference
|
||||||
|
|
||||||
|
### `RateLimiter`
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
class RateLimiter {
|
||||||
|
constructor(options: RateLimiterOptions);
|
||||||
|
|
||||||
|
// Try to acquire permission (non-blocking)
|
||||||
|
tryAcquire(): RateLimitResult;
|
||||||
|
|
||||||
|
// Wait for permission (blocking with timeout)
|
||||||
|
waitAndAcquire(maxWaitMs?: number): Promise<boolean>;
|
||||||
|
|
||||||
|
// Record operation result for circuit breaker
|
||||||
|
recordSuccess(): void;
|
||||||
|
recordFailure(): void;
|
||||||
|
|
||||||
|
// Introspection
|
||||||
|
getState(): RateLimiterState;
|
||||||
|
getMetrics(): RateLimitMetrics;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### `rateLimiterRegistry`
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Global registry of all rate limiters
|
||||||
|
const rateLimiterRegistry: {
|
||||||
|
getOrCreate(key: string, options: RateLimiterOptions): RateLimiter;
|
||||||
|
get(key: string): RateLimiter | undefined;
|
||||||
|
getAll(): Map<string, RateLimiter>;
|
||||||
|
getAggregatedMetrics(): AggregatedMetrics;
|
||||||
|
clear(): void;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Helper Functions
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Create standard provider rate limiter options
|
||||||
|
function createProviderRateLimiterOptions(params: {
|
||||||
|
provider: string;
|
||||||
|
accountId?: string;
|
||||||
|
requestsPerSecond: number;
|
||||||
|
burstSize?: number;
|
||||||
|
circuitBreaker?: boolean | CircuitBreakerConfig;
|
||||||
|
}): RateLimiterOptions;
|
||||||
|
```
|
||||||
|
|
||||||
|
## Related
|
||||||
|
|
||||||
|
- [Channel Configuration](/channels)
|
||||||
|
- [Retry & Error Handling](/gateway/error-handling)
|
||||||
|
- [Performance Tuning](/gateway/performance)
|
||||||
|
- [Monitoring & Metrics](/diagnostics/metrics)
|
||||||
303
src/discord/rate-limited-api.ts
Normal file
303
src/discord/rate-limited-api.ts
Normal file
@ -0,0 +1,303 @@
|
|||||||
|
/**
|
||||||
|
* Rate-limited Discord API wrapper
|
||||||
|
*
|
||||||
|
* Integrates the enhanced rate limiting system with Discord API calls
|
||||||
|
* to prevent rate limit errors proactively while maintaining throughput.
|
||||||
|
*
|
||||||
|
* @module discord/rate-limited-api
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { RequestClient } from "@buape/carbon";
|
||||||
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
|
import {
|
||||||
|
RateLimiter,
|
||||||
|
createProviderRateLimiterOptions,
|
||||||
|
rateLimiterRegistry,
|
||||||
|
type RateLimitResult,
|
||||||
|
} from "../infra/rate-limiter.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("discord/rate-limited");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discord rate limits (from Discord API documentation)
|
||||||
|
* - Global: 50 requests per second
|
||||||
|
* - Per-route: varies by endpoint
|
||||||
|
* - Per-guild: 10 requests per 10 seconds
|
||||||
|
* - DM: 5 requests per second per recipient
|
||||||
|
*/
|
||||||
|
const DISCORD_GLOBAL_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 50,
|
||||||
|
burstSize: 100, // Allow bursts
|
||||||
|
};
|
||||||
|
|
||||||
|
const DISCORD_DM_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 5,
|
||||||
|
burstSize: 10,
|
||||||
|
};
|
||||||
|
|
||||||
|
const DISCORD_GUILD_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 1, // 10 per 10s
|
||||||
|
burstSize: 10,
|
||||||
|
};
|
||||||
|
|
||||||
|
export type DiscordRateLimitedApiOptions = {
|
||||||
|
accountId: string;
|
||||||
|
rest: RequestClient;
|
||||||
|
/** Enable verbose logging */
|
||||||
|
verbose?: boolean;
|
||||||
|
/** Override default rate limits */
|
||||||
|
customLimits?: {
|
||||||
|
global?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
dm?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
guild?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rate-limited wrapper for Discord API client
|
||||||
|
*/
|
||||||
|
export class DiscordRateLimitedApi {
|
||||||
|
private readonly accountId: string;
|
||||||
|
private readonly rest: RequestClient;
|
||||||
|
private readonly verbose: boolean;
|
||||||
|
private readonly globalLimiter: RateLimiter;
|
||||||
|
|
||||||
|
constructor(options: DiscordRateLimitedApiOptions) {
|
||||||
|
this.accountId = options.accountId;
|
||||||
|
this.rest = options.rest;
|
||||||
|
this.verbose = options.verbose ?? false;
|
||||||
|
|
||||||
|
// Create global rate limiter
|
||||||
|
const globalLimits = options.customLimits?.global ?? DISCORD_GLOBAL_RATE_LIMIT;
|
||||||
|
this.globalLimiter = rateLimiterRegistry.getOrCreate(
|
||||||
|
`discord:${options.accountId}:global`,
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "discord",
|
||||||
|
accountId: `${options.accountId}:global`,
|
||||||
|
requestsPerSecond: globalLimits.requestsPerSecond,
|
||||||
|
burstSize: globalLimits.burstSize,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a Discord API request with rate limiting
|
||||||
|
* @param fn The API call to execute
|
||||||
|
* @param context Context for logging and rate limit selection
|
||||||
|
*/
|
||||||
|
async executeWithRateLimit<T>(
|
||||||
|
fn: () => Promise<T>,
|
||||||
|
context: {
|
||||||
|
endpoint: string;
|
||||||
|
channelId?: string;
|
||||||
|
guildId?: string;
|
||||||
|
userId?: string;
|
||||||
|
},
|
||||||
|
): Promise<T> {
|
||||||
|
const limiter = this.selectLimiter(context);
|
||||||
|
|
||||||
|
// Try to acquire rate limit token
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
|
||||||
|
if (!result.allowed) {
|
||||||
|
if (this.verbose) {
|
||||||
|
log.warn(
|
||||||
|
`[discord:${this.accountId}] Rate limit hit for ${context.endpoint}. ` +
|
||||||
|
`Circuit: ${result.circuitState}, Retry after: ${result.retryAfter}ms`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for rate limit to clear
|
||||||
|
const acquired = await limiter.waitAndAcquire();
|
||||||
|
if (!acquired) {
|
||||||
|
throw new Error(
|
||||||
|
`Discord rate limit timeout for ${context.endpoint} (circuit: ${result.circuitState})`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fn();
|
||||||
|
limiter.recordSuccess();
|
||||||
|
return response;
|
||||||
|
} catch (err) {
|
||||||
|
limiter.recordFailure();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Select appropriate rate limiter based on context
|
||||||
|
*/
|
||||||
|
private selectLimiter(context: {
|
||||||
|
endpoint: string;
|
||||||
|
channelId?: string;
|
||||||
|
guildId?: string;
|
||||||
|
userId?: string;
|
||||||
|
}): RateLimiter {
|
||||||
|
// DM-specific limiter (per user)
|
||||||
|
if (context.userId && !context.guildId) {
|
||||||
|
const key = `discord:${this.accountId}:dm:${context.userId}`;
|
||||||
|
return rateLimiterRegistry.getOrCreate(
|
||||||
|
key,
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "discord",
|
||||||
|
accountId: `${this.accountId}:dm:${context.userId}`,
|
||||||
|
requestsPerSecond: DISCORD_DM_RATE_LIMIT.requestsPerSecond,
|
||||||
|
burstSize: DISCORD_DM_RATE_LIMIT.burstSize,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Guild-specific limiter
|
||||||
|
if (context.guildId) {
|
||||||
|
const key = `discord:${this.accountId}:guild:${context.guildId}`;
|
||||||
|
return rateLimiterRegistry.getOrCreate(
|
||||||
|
key,
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "discord",
|
||||||
|
accountId: `${this.accountId}:guild:${context.guildId}`,
|
||||||
|
requestsPerSecond: DISCORD_GUILD_RATE_LIMIT.requestsPerSecond,
|
||||||
|
burstSize: DISCORD_GUILD_RATE_LIMIT.burstSize,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to global limiter
|
||||||
|
return this.globalLimiter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current rate limiting state
|
||||||
|
*/
|
||||||
|
getState(): {
|
||||||
|
accountId: string;
|
||||||
|
global: ReturnType<RateLimiter["getState"]>;
|
||||||
|
limiters: Map<string, RateLimiter>;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
accountId: this.accountId,
|
||||||
|
global: this.globalLimiter.getState(),
|
||||||
|
limiters: rateLimiterRegistry.getAll(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get metrics for all Discord rate limiters
|
||||||
|
*/
|
||||||
|
getMetrics(): ReturnType<typeof rateLimiterRegistry.getAggregatedMetrics> {
|
||||||
|
return rateLimiterRegistry.getAggregatedMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper to wrap Discord REST API calls with rate limiting
|
||||||
|
*/
|
||||||
|
export function createRateLimitedDiscordClient(options: DiscordRateLimitedApiOptions) {
|
||||||
|
const api = new DiscordRateLimitedApi(options);
|
||||||
|
|
||||||
|
return {
|
||||||
|
/**
|
||||||
|
* Send a message to a channel
|
||||||
|
*/
|
||||||
|
async sendMessage(params: {
|
||||||
|
channelId: string;
|
||||||
|
content: string;
|
||||||
|
guildId?: string;
|
||||||
|
}): Promise<unknown> {
|
||||||
|
return api.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.rest.post(
|
||||||
|
`/channels/${params.channelId}/messages` as `/channels/${string}/messages`,
|
||||||
|
{
|
||||||
|
body: { content: params.content },
|
||||||
|
},
|
||||||
|
),
|
||||||
|
{
|
||||||
|
endpoint: "createMessage",
|
||||||
|
channelId: params.channelId,
|
||||||
|
guildId: params.guildId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a DM to a user
|
||||||
|
*/
|
||||||
|
async sendDirectMessage(params: { userId: string; content: string }): Promise<unknown> {
|
||||||
|
// First create DM channel
|
||||||
|
const dmChannel = await api.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.rest.post("/users/@me/channels" as const, {
|
||||||
|
body: { recipient_id: params.userId },
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
endpoint: "createDM",
|
||||||
|
userId: params.userId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const channelId = (dmChannel as { id?: string }).id;
|
||||||
|
if (!channelId) {
|
||||||
|
throw new Error("Failed to create DM channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send message
|
||||||
|
return api.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.rest.post(`/channels/${channelId}/messages` as `/channels/${string}/messages`, {
|
||||||
|
body: { content: params.content },
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
endpoint: "createMessage",
|
||||||
|
channelId,
|
||||||
|
userId: params.userId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a reaction to a message
|
||||||
|
*/
|
||||||
|
async addReaction(params: {
|
||||||
|
channelId: string;
|
||||||
|
messageId: string;
|
||||||
|
emoji: string;
|
||||||
|
guildId?: string;
|
||||||
|
}): Promise<void> {
|
||||||
|
await api.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.rest.put(
|
||||||
|
`/channels/${params.channelId}/messages/${params.messageId}/reactions/${params.emoji}/@me` as `/channels/${string}/messages/${string}/reactions/${string}/@me`,
|
||||||
|
),
|
||||||
|
{
|
||||||
|
endpoint: "addReaction",
|
||||||
|
channelId: params.channelId,
|
||||||
|
guildId: params.guildId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get raw API instance (without rate limiting)
|
||||||
|
* Use with caution - bypasses rate limiting
|
||||||
|
*/
|
||||||
|
getRawRest(): RequestClient {
|
||||||
|
return options.rest;
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current rate limiting state
|
||||||
|
*/
|
||||||
|
getState() {
|
||||||
|
return api.getState();
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get rate limiting metrics
|
||||||
|
*/
|
||||||
|
getMetrics() {
|
||||||
|
return api.getMetrics();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
543
src/infra/rate-limiter.test.ts
Normal file
543
src/infra/rate-limiter.test.ts
Normal file
@ -0,0 +1,543 @@
|
|||||||
|
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||||
|
|
||||||
|
import {
|
||||||
|
RateLimiter,
|
||||||
|
createProviderRateLimiterOptions,
|
||||||
|
rateLimiterRegistry,
|
||||||
|
type CircuitState,
|
||||||
|
} from "./rate-limiter.js";
|
||||||
|
|
||||||
|
describe("RateLimiter", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
rateLimiterRegistry.clear();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Sliding Window Algorithm", () => {
|
||||||
|
it("allows requests within rate limit", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 5,
|
||||||
|
windowMs: 1000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(true);
|
||||||
|
expect(result.remaining).toBeGreaterThanOrEqual(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
expect(metrics.allowedRequests).toBe(5);
|
||||||
|
expect(metrics.rejectedRequests).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects requests exceeding rate limit", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 3,
|
||||||
|
windowMs: 1000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fill up the limit
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
limiter.tryAcquire();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be rejected
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(false);
|
||||||
|
expect(result.retryAfter).toBeGreaterThan(0);
|
||||||
|
expect(result.reason).toBe("Rate limit exceeded");
|
||||||
|
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
expect(metrics.allowedRequests).toBe(3);
|
||||||
|
expect(metrics.rejectedRequests).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("allows requests after window expires", async () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 2,
|
||||||
|
windowMs: 1000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fill limit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Should be rejected
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(false);
|
||||||
|
|
||||||
|
// Advance time past window
|
||||||
|
vi.advanceTimersByTime(1100);
|
||||||
|
|
||||||
|
// Should be allowed now
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Token Bucket Pattern", () => {
|
||||||
|
it("allows burst requests up to burst size", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 5,
|
||||||
|
windowMs: 1000,
|
||||||
|
burstSize: 10,
|
||||||
|
refillRate: 5,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Should allow burst
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exceeded burst
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("refills tokens over time", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 5,
|
||||||
|
windowMs: 1000,
|
||||||
|
burstSize: 5,
|
||||||
|
refillRate: 5, // 5 tokens per second
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Use all tokens
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
limiter.tryAcquire();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be rejected
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(false);
|
||||||
|
|
||||||
|
// Advance 200ms (1 token should refill)
|
||||||
|
vi.advanceTimersByTime(200);
|
||||||
|
|
||||||
|
// Should allow 1 request
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("caps tokens at burst size", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 5,
|
||||||
|
windowMs: 1000,
|
||||||
|
burstSize: 5,
|
||||||
|
refillRate: 5,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait long enough to refill multiple times
|
||||||
|
vi.advanceTimersByTime(10_000);
|
||||||
|
|
||||||
|
// Should not have more than burst size
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(true);
|
||||||
|
}
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Circuit Breaker", () => {
|
||||||
|
it("opens circuit after failure threshold", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 100, windowMs: 1000 },
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 3,
|
||||||
|
resetTimeoutMs: 5000,
|
||||||
|
successThreshold: 2,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Record failures
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Circuit should be open
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(false);
|
||||||
|
expect(result.circuitState).toBe("open");
|
||||||
|
expect(result.reason).toBe("Circuit breaker is open");
|
||||||
|
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
expect(metrics.circuitRejections).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("transitions to half-open after reset timeout", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 100, windowMs: 1000 },
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 2,
|
||||||
|
resetTimeoutMs: 5000,
|
||||||
|
successThreshold: 2,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Open circuit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
|
||||||
|
expect(limiter.tryAcquire().circuitState).toBe("open");
|
||||||
|
|
||||||
|
// Wait for reset timeout
|
||||||
|
vi.advanceTimersByTime(5100);
|
||||||
|
|
||||||
|
// Should transition to half-open
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.allowed).toBe(true);
|
||||||
|
expect(result.circuitState).toBe("half_open");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("closes circuit after success threshold in half-open", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 100, windowMs: 1000 },
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 2,
|
||||||
|
resetTimeoutMs: 5000,
|
||||||
|
successThreshold: 2,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Open circuit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
|
||||||
|
// Wait and transition to half-open
|
||||||
|
vi.advanceTimersByTime(5100);
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Record successes
|
||||||
|
limiter.recordSuccess();
|
||||||
|
limiter.recordSuccess();
|
||||||
|
|
||||||
|
// Should be closed now
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.circuitState).toBe("closed");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("reopens circuit on failure in half-open state", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 100, windowMs: 1000 },
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 2,
|
||||||
|
resetTimeoutMs: 5000,
|
||||||
|
successThreshold: 2,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Open circuit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.recordFailure();
|
||||||
|
|
||||||
|
// Transition to half-open
|
||||||
|
vi.advanceTimersByTime(5100);
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Record failure in half-open
|
||||||
|
limiter.recordFailure();
|
||||||
|
|
||||||
|
// Should be open again
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.circuitState).toBe("open");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("works without circuit breaker", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
expect(result.circuitState).toBe("closed");
|
||||||
|
expect(result.allowed).toBe(true);
|
||||||
|
|
||||||
|
// Should not affect behavior
|
||||||
|
limiter.recordFailure();
|
||||||
|
limiter.recordFailure();
|
||||||
|
limiter.recordFailure();
|
||||||
|
|
||||||
|
expect(limiter.tryAcquire().allowed).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("waitAndAcquire", () => {
|
||||||
|
it("waits and acquires when rate limit opens", async () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 1,
|
||||||
|
windowMs: 1000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fill limit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Start waiting
|
||||||
|
const promise = limiter.waitAndAcquire(5000);
|
||||||
|
|
||||||
|
// Advance time to allow request
|
||||||
|
vi.advanceTimersByTime(1100);
|
||||||
|
|
||||||
|
const result = await promise;
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("times out if max wait exceeded", async () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: 1,
|
||||||
|
windowMs: 10_000, // Very long window
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fill limit
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Start waiting with short timeout
|
||||||
|
const promise = limiter.waitAndAcquire(500);
|
||||||
|
|
||||||
|
vi.advanceTimersByTime(600);
|
||||||
|
|
||||||
|
const result = await promise;
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Metrics", () => {
|
||||||
|
it("tracks request metrics accurately", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 3, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Allowed requests
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Rejected requests
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
expect(metrics.totalRequests).toBe(5);
|
||||||
|
expect(metrics.allowedRequests).toBe(3);
|
||||||
|
expect(metrics.rejectedRequests).toBe(2);
|
||||||
|
expect(metrics.averageWaitMs).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("calculates average wait time", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 1, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
// Rejected with retry-after
|
||||||
|
const result1 = limiter.tryAcquire();
|
||||||
|
expect(result1.retryAfter).toBeDefined();
|
||||||
|
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
expect(metrics.averageWaitMs).toBeGreaterThan(0);
|
||||||
|
expect(metrics.averageWaitMs).toBeLessThanOrEqual(1000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("State inspection", () => {
|
||||||
|
it("provides detailed state information", () => {
|
||||||
|
const limiter = new RateLimiter({
|
||||||
|
identifier: "test-limiter",
|
||||||
|
channel: "discord",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 3,
|
||||||
|
resetTimeoutMs: 5000,
|
||||||
|
successThreshold: 2,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
limiter.tryAcquire();
|
||||||
|
limiter.tryAcquire();
|
||||||
|
|
||||||
|
const state = limiter.getState();
|
||||||
|
expect(state.identifier).toBe("test-limiter");
|
||||||
|
expect(state.channel).toBe("discord");
|
||||||
|
expect(state.limiter.requests).toBe(2);
|
||||||
|
expect(state.limiter.maxRequests).toBe(5);
|
||||||
|
expect(state.circuit).toBeDefined();
|
||||||
|
expect(state.circuit?.state).toBe("closed");
|
||||||
|
expect(state.metrics).toBeDefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("RateLimiterRegistry", () => {
|
||||||
|
afterEach(() => {
|
||||||
|
rateLimiterRegistry.clear();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("creates and retrieves limiters by key", () => {
|
||||||
|
const options = {
|
||||||
|
identifier: "discord:123",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const limiter1 = rateLimiterRegistry.getOrCreate("discord:123", options);
|
||||||
|
const limiter2 = rateLimiterRegistry.getOrCreate("discord:123", options);
|
||||||
|
|
||||||
|
expect(limiter1).toBe(limiter2); // Same instance
|
||||||
|
});
|
||||||
|
|
||||||
|
it("tracks multiple limiters", () => {
|
||||||
|
rateLimiterRegistry.getOrCreate("discord:123", {
|
||||||
|
identifier: "discord:123",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
rateLimiterRegistry.getOrCreate("telegram:456", {
|
||||||
|
identifier: "telegram:456",
|
||||||
|
rateLimit: { maxRequests: 10, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
const all = rateLimiterRegistry.getAll();
|
||||||
|
expect(all.size).toBe(2);
|
||||||
|
expect(all.has("discord:123")).toBe(true);
|
||||||
|
expect(all.has("telegram:456")).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("aggregates metrics across all limiters", () => {
|
||||||
|
const limiter1 = rateLimiterRegistry.getOrCreate("discord:123", {
|
||||||
|
identifier: "discord:123",
|
||||||
|
channel: "discord",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
const limiter2 = rateLimiterRegistry.getOrCreate("telegram:456", {
|
||||||
|
identifier: "telegram:456",
|
||||||
|
channel: "telegram",
|
||||||
|
rateLimit: { maxRequests: 10, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
limiter1.tryAcquire();
|
||||||
|
limiter1.tryAcquire();
|
||||||
|
limiter2.tryAcquire();
|
||||||
|
|
||||||
|
const aggregated = rateLimiterRegistry.getAggregatedMetrics();
|
||||||
|
expect(aggregated.totalLimiters).toBe(2);
|
||||||
|
expect(aggregated.metrics.totalRequests).toBe(3);
|
||||||
|
expect(aggregated.metrics.allowedRequests).toBe(3);
|
||||||
|
expect(aggregated.byChannel.discord).toBeDefined();
|
||||||
|
expect(aggregated.byChannel.telegram).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("clears all limiters", () => {
|
||||||
|
rateLimiterRegistry.getOrCreate("test", {
|
||||||
|
identifier: "test",
|
||||||
|
rateLimit: { maxRequests: 5, windowMs: 1000 },
|
||||||
|
});
|
||||||
|
|
||||||
|
rateLimiterRegistry.clear();
|
||||||
|
|
||||||
|
expect(rateLimiterRegistry.getAll().size).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("createProviderRateLimiterOptions", () => {
|
||||||
|
it("creates options with defaults", () => {
|
||||||
|
const options = createProviderRateLimiterOptions({
|
||||||
|
provider: "discord",
|
||||||
|
requestsPerSecond: 10,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(options.identifier).toBe("discord");
|
||||||
|
expect(options.channel).toBe("discord");
|
||||||
|
expect(options.rateLimit.maxRequests).toBe(10);
|
||||||
|
expect(options.rateLimit.windowMs).toBe(1000);
|
||||||
|
expect(options.rateLimit.burstSize).toBe(20); // Default: 2x
|
||||||
|
expect(options.circuitBreaker).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes account ID in identifier", () => {
|
||||||
|
const options = createProviderRateLimiterOptions({
|
||||||
|
provider: "telegram",
|
||||||
|
accountId: "user123",
|
||||||
|
requestsPerSecond: 5,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(options.identifier).toBe("telegram:user123");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("allows custom burst size", () => {
|
||||||
|
const options = createProviderRateLimiterOptions({
|
||||||
|
provider: "whatsapp",
|
||||||
|
requestsPerSecond: 10,
|
||||||
|
burstSize: 50,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(options.rateLimit.burstSize).toBe(50);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("can disable circuit breaker", () => {
|
||||||
|
const options = createProviderRateLimiterOptions({
|
||||||
|
provider: "slack",
|
||||||
|
requestsPerSecond: 5,
|
||||||
|
circuitBreaker: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(options.circuitBreaker).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts custom circuit breaker config", () => {
|
||||||
|
const options = createProviderRateLimiterOptions({
|
||||||
|
provider: "signal",
|
||||||
|
requestsPerSecond: 5,
|
||||||
|
circuitBreaker: {
|
||||||
|
failureThreshold: 10,
|
||||||
|
resetTimeoutMs: 60_000,
|
||||||
|
successThreshold: 3,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(options.circuitBreaker).toEqual({
|
||||||
|
failureThreshold: 10,
|
||||||
|
resetTimeoutMs: 60_000,
|
||||||
|
successThreshold: 3,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
560
src/infra/rate-limiter.ts
Normal file
560
src/infra/rate-limiter.ts
Normal file
@ -0,0 +1,560 @@
|
|||||||
|
/**
|
||||||
|
* Enhanced Rate Limiting System with Circuit Breaker Pattern
|
||||||
|
*
|
||||||
|
* Provides intelligent rate limiting across messaging channels to prevent
|
||||||
|
* rate limit errors proactively while maintaining high throughput.
|
||||||
|
*
|
||||||
|
* Features:
|
||||||
|
* - Sliding window rate limiting with token bucket algorithm
|
||||||
|
* - Circuit breaker pattern for fault tolerance
|
||||||
|
* - Adaptive backoff based on provider responses
|
||||||
|
* - Per-channel and per-account isolation
|
||||||
|
* - Comprehensive metrics and monitoring
|
||||||
|
*
|
||||||
|
* @module rate-limiter
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("rate-limiter");
|
||||||
|
|
||||||
|
export type RateLimitConfig = {
|
||||||
|
/** Maximum requests allowed in the window */
|
||||||
|
maxRequests: number;
|
||||||
|
/** Time window in milliseconds */
|
||||||
|
windowMs: number;
|
||||||
|
/** Burst allowance (tokens available immediately) */
|
||||||
|
burstSize?: number;
|
||||||
|
/** Token refill rate per second */
|
||||||
|
refillRate?: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type CircuitBreakerConfig = {
|
||||||
|
/** Failure threshold before opening circuit */
|
||||||
|
failureThreshold: number;
|
||||||
|
/** Timeout in ms before attempting half-open */
|
||||||
|
resetTimeoutMs: number;
|
||||||
|
/** Success threshold in half-open state to close circuit */
|
||||||
|
successThreshold: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type RateLimiterOptions = {
|
||||||
|
/** Rate limit configuration */
|
||||||
|
rateLimit: RateLimitConfig;
|
||||||
|
/** Circuit breaker configuration */
|
||||||
|
circuitBreaker?: CircuitBreakerConfig;
|
||||||
|
/** Identifier for this limiter instance */
|
||||||
|
identifier: string;
|
||||||
|
/** Channel or provider name for logging */
|
||||||
|
channel?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type RateLimitResult = {
|
||||||
|
/** Whether the request is allowed */
|
||||||
|
allowed: boolean;
|
||||||
|
/** Remaining requests in current window */
|
||||||
|
remaining: number;
|
||||||
|
/** Time in ms until next token is available */
|
||||||
|
retryAfter?: number;
|
||||||
|
/** Current circuit breaker state */
|
||||||
|
circuitState: CircuitState;
|
||||||
|
/** Reason for rejection if not allowed */
|
||||||
|
reason?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type CircuitState = "closed" | "open" | "half_open";
|
||||||
|
|
||||||
|
export type RateLimitMetrics = {
|
||||||
|
/** Total requests attempted */
|
||||||
|
totalRequests: number;
|
||||||
|
/** Requests allowed through */
|
||||||
|
allowedRequests: number;
|
||||||
|
/** Requests rejected by rate limit */
|
||||||
|
rejectedRequests: number;
|
||||||
|
/** Requests rejected by circuit breaker */
|
||||||
|
circuitRejections: number;
|
||||||
|
/** Circuit breaker state changes */
|
||||||
|
circuitStateChanges: number;
|
||||||
|
/** Average wait time in ms */
|
||||||
|
averageWaitMs: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const DEFAULT_CIRCUIT_BREAKER_CONFIG: CircuitBreakerConfig = {
|
||||||
|
failureThreshold: 5,
|
||||||
|
resetTimeoutMs: 30_000,
|
||||||
|
successThreshold: 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sliding window rate limiter with token bucket algorithm
|
||||||
|
*/
|
||||||
|
class SlidingWindowLimiter {
|
||||||
|
private readonly maxRequests: number;
|
||||||
|
private readonly windowMs: number;
|
||||||
|
private readonly timestamps: number[] = [];
|
||||||
|
private tokens: number;
|
||||||
|
private readonly burstSize: number;
|
||||||
|
private readonly refillRate: number;
|
||||||
|
private lastRefill: number;
|
||||||
|
|
||||||
|
constructor(config: RateLimitConfig) {
|
||||||
|
this.maxRequests = config.maxRequests;
|
||||||
|
this.windowMs = config.windowMs;
|
||||||
|
this.burstSize = config.burstSize ?? config.maxRequests;
|
||||||
|
this.refillRate = config.refillRate ?? config.maxRequests / (config.windowMs / 1000);
|
||||||
|
this.tokens = this.burstSize;
|
||||||
|
this.lastRefill = Date.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to consume a token
|
||||||
|
* @returns Object with allowed status and retry information
|
||||||
|
*/
|
||||||
|
tryAcquire(): { allowed: boolean; remaining: number; retryAfter?: number } {
|
||||||
|
const now = Date.now();
|
||||||
|
this.refillTokens(now);
|
||||||
|
this.pruneOldTimestamps(now);
|
||||||
|
|
||||||
|
// Check token bucket
|
||||||
|
if (this.tokens >= 1) {
|
||||||
|
this.tokens -= 1;
|
||||||
|
this.timestamps.push(now);
|
||||||
|
return {
|
||||||
|
allowed: true,
|
||||||
|
remaining: Math.floor(this.tokens),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check sliding window
|
||||||
|
if (this.timestamps.length < this.maxRequests) {
|
||||||
|
this.timestamps.push(now);
|
||||||
|
return {
|
||||||
|
allowed: true,
|
||||||
|
remaining: this.maxRequests - this.timestamps.length,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate retry after
|
||||||
|
const oldestTimestamp = this.timestamps[0];
|
||||||
|
const retryAfter = oldestTimestamp ? oldestTimestamp + this.windowMs - now : this.windowMs;
|
||||||
|
|
||||||
|
return {
|
||||||
|
allowed: false,
|
||||||
|
remaining: 0,
|
||||||
|
retryAfter: Math.max(0, retryAfter),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current limiter state
|
||||||
|
*/
|
||||||
|
getState(): { tokens: number; requests: number; maxRequests: number } {
|
||||||
|
const now = Date.now();
|
||||||
|
this.refillTokens(now);
|
||||||
|
this.pruneOldTimestamps(now);
|
||||||
|
|
||||||
|
return {
|
||||||
|
tokens: this.tokens,
|
||||||
|
requests: this.timestamps.length,
|
||||||
|
maxRequests: this.maxRequests,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private refillTokens(now: number): void {
|
||||||
|
const timeSinceLastRefill = now - this.lastRefill;
|
||||||
|
const tokensToAdd = (timeSinceLastRefill / 1000) * this.refillRate;
|
||||||
|
|
||||||
|
if (tokensToAdd >= 1) {
|
||||||
|
this.tokens = Math.min(this.burstSize, this.tokens + Math.floor(tokensToAdd));
|
||||||
|
this.lastRefill = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private pruneOldTimestamps(now: number): void {
|
||||||
|
const cutoff = now - this.windowMs;
|
||||||
|
while (this.timestamps.length > 0 && this.timestamps[0]! < cutoff) {
|
||||||
|
this.timestamps.shift();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Circuit breaker implementation for fault tolerance
|
||||||
|
*/
|
||||||
|
class CircuitBreaker {
|
||||||
|
private state: CircuitState = "closed";
|
||||||
|
private failureCount = 0;
|
||||||
|
private successCount = 0;
|
||||||
|
private lastFailureTime = 0;
|
||||||
|
private readonly config: CircuitBreakerConfig;
|
||||||
|
private readonly identifier: string;
|
||||||
|
|
||||||
|
constructor(config: CircuitBreakerConfig, identifier: string) {
|
||||||
|
this.config = config;
|
||||||
|
this.identifier = identifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if request should be allowed through circuit
|
||||||
|
*/
|
||||||
|
canAttempt(): { allowed: boolean; state: CircuitState } {
|
||||||
|
if (this.state === "closed") {
|
||||||
|
return { allowed: true, state: this.state };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.state === "open") {
|
||||||
|
const now = Date.now();
|
||||||
|
if (now - this.lastFailureTime >= this.config.resetTimeoutMs) {
|
||||||
|
this.transitionTo("half_open");
|
||||||
|
return { allowed: true, state: this.state };
|
||||||
|
}
|
||||||
|
return { allowed: false, state: this.state };
|
||||||
|
}
|
||||||
|
|
||||||
|
// half_open state - allow request to test recovery
|
||||||
|
return { allowed: true, state: this.state };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a successful request
|
||||||
|
*/
|
||||||
|
recordSuccess(): void {
|
||||||
|
this.failureCount = 0;
|
||||||
|
|
||||||
|
if (this.state === "half_open") {
|
||||||
|
this.successCount += 1;
|
||||||
|
if (this.successCount >= this.config.successThreshold) {
|
||||||
|
this.transitionTo("closed");
|
||||||
|
this.successCount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a failed request
|
||||||
|
*/
|
||||||
|
recordFailure(): void {
|
||||||
|
this.lastFailureTime = Date.now();
|
||||||
|
this.failureCount += 1;
|
||||||
|
this.successCount = 0;
|
||||||
|
|
||||||
|
if (this.state === "half_open") {
|
||||||
|
this.transitionTo("open");
|
||||||
|
} else if (this.state === "closed" && this.failureCount >= this.config.failureThreshold) {
|
||||||
|
this.transitionTo("open");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current circuit state
|
||||||
|
*/
|
||||||
|
getState(): CircuitState {
|
||||||
|
return this.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get circuit metrics
|
||||||
|
*/
|
||||||
|
getMetrics(): { failureCount: number; successCount: number; state: CircuitState } {
|
||||||
|
return {
|
||||||
|
failureCount: this.failureCount,
|
||||||
|
successCount: this.successCount,
|
||||||
|
state: this.state,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private transitionTo(newState: CircuitState): void {
|
||||||
|
if (this.state === newState) return;
|
||||||
|
|
||||||
|
const oldState = this.state;
|
||||||
|
this.state = newState;
|
||||||
|
|
||||||
|
log.info(`[${this.identifier}] Circuit breaker: ${oldState} → ${newState}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enhanced rate limiter with circuit breaker pattern
|
||||||
|
*/
|
||||||
|
export class RateLimiter {
|
||||||
|
private readonly limiter: SlidingWindowLimiter;
|
||||||
|
private readonly circuitBreaker: CircuitBreaker | null;
|
||||||
|
private readonly identifier: string;
|
||||||
|
private readonly channel?: string;
|
||||||
|
|
||||||
|
// Metrics
|
||||||
|
private metrics: RateLimitMetrics = {
|
||||||
|
totalRequests: 0,
|
||||||
|
allowedRequests: 0,
|
||||||
|
rejectedRequests: 0,
|
||||||
|
circuitRejections: 0,
|
||||||
|
circuitStateChanges: 0,
|
||||||
|
averageWaitMs: 0,
|
||||||
|
};
|
||||||
|
private totalWaitMs = 0;
|
||||||
|
|
||||||
|
constructor(options: RateLimiterOptions) {
|
||||||
|
this.limiter = new SlidingWindowLimiter(options.rateLimit);
|
||||||
|
this.circuitBreaker = options.circuitBreaker
|
||||||
|
? new CircuitBreaker(options.circuitBreaker, options.identifier)
|
||||||
|
: null;
|
||||||
|
this.identifier = options.identifier;
|
||||||
|
this.channel = options.channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to acquire permission for a request
|
||||||
|
* @returns Result indicating if request is allowed
|
||||||
|
*/
|
||||||
|
tryAcquire(): RateLimitResult {
|
||||||
|
this.metrics.totalRequests += 1;
|
||||||
|
|
||||||
|
// Check circuit breaker first
|
||||||
|
if (this.circuitBreaker) {
|
||||||
|
const circuitCheck = this.circuitBreaker.canAttempt();
|
||||||
|
if (!circuitCheck.allowed) {
|
||||||
|
this.metrics.circuitRejections += 1;
|
||||||
|
return {
|
||||||
|
allowed: false,
|
||||||
|
remaining: 0,
|
||||||
|
circuitState: circuitCheck.state,
|
||||||
|
reason: "Circuit breaker is open",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check rate limit
|
||||||
|
const limitCheck = this.limiter.tryAcquire();
|
||||||
|
|
||||||
|
if (limitCheck.allowed) {
|
||||||
|
this.metrics.allowedRequests += 1;
|
||||||
|
} else {
|
||||||
|
this.metrics.rejectedRequests += 1;
|
||||||
|
if (limitCheck.retryAfter) {
|
||||||
|
this.totalWaitMs += limitCheck.retryAfter;
|
||||||
|
this.metrics.averageWaitMs =
|
||||||
|
this.metrics.rejectedRequests > 0 ? this.totalWaitMs / this.metrics.rejectedRequests : 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
allowed: limitCheck.allowed,
|
||||||
|
remaining: limitCheck.remaining,
|
||||||
|
retryAfter: limitCheck.retryAfter,
|
||||||
|
circuitState: this.circuitBreaker?.getState() ?? "closed",
|
||||||
|
reason: limitCheck.allowed ? undefined : "Rate limit exceeded",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a successful request execution
|
||||||
|
*/
|
||||||
|
recordSuccess(): void {
|
||||||
|
this.circuitBreaker?.recordSuccess();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a failed request execution
|
||||||
|
*/
|
||||||
|
recordFailure(): void {
|
||||||
|
this.circuitBreaker?.recordFailure();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current rate limiter metrics
|
||||||
|
*/
|
||||||
|
getMetrics(): RateLimitMetrics {
|
||||||
|
return { ...this.metrics };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get detailed state information
|
||||||
|
*/
|
||||||
|
getState(): {
|
||||||
|
identifier: string;
|
||||||
|
channel?: string;
|
||||||
|
limiter: { tokens: number; requests: number; maxRequests: number };
|
||||||
|
circuit?: { failureCount: number; successCount: number; state: CircuitState };
|
||||||
|
metrics: RateLimitMetrics;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
identifier: this.identifier,
|
||||||
|
channel: this.channel,
|
||||||
|
limiter: this.limiter.getState(),
|
||||||
|
circuit: this.circuitBreaker?.getMetrics(),
|
||||||
|
metrics: this.getMetrics(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for rate limit availability and then acquire
|
||||||
|
* @param maxWaitMs Maximum time to wait (default: 30s)
|
||||||
|
* @returns Whether acquisition was successful
|
||||||
|
*/
|
||||||
|
async waitAndAcquire(maxWaitMs = 30_000): Promise<boolean> {
|
||||||
|
const startTime = Date.now();
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const result = this.tryAcquire();
|
||||||
|
|
||||||
|
if (result.allowed) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check timeout
|
||||||
|
const elapsed = Date.now() - startTime;
|
||||||
|
if (elapsed >= maxWaitMs) {
|
||||||
|
log.warn(
|
||||||
|
`[${this.identifier}] Wait timeout exceeded after ${elapsed}ms (max: ${maxWaitMs}ms)`,
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for retry period
|
||||||
|
const waitMs = Math.min(result.retryAfter ?? 1000, maxWaitMs - elapsed);
|
||||||
|
if (waitMs > 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, waitMs));
|
||||||
|
} else {
|
||||||
|
// No retry-after, wait a bit before trying again
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global rate limiter registry
|
||||||
|
*/
|
||||||
|
class RateLimiterRegistry {
|
||||||
|
private readonly limiters = new Map<string, RateLimiter>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or create a rate limiter for a specific identifier
|
||||||
|
*/
|
||||||
|
getOrCreate(key: string, options: RateLimiterOptions): RateLimiter {
|
||||||
|
let limiter = this.limiters.get(key);
|
||||||
|
if (!limiter) {
|
||||||
|
limiter = new RateLimiter(options);
|
||||||
|
this.limiters.set(key, limiter);
|
||||||
|
log.info(`[rate-limiter] Created new limiter: ${key}`);
|
||||||
|
}
|
||||||
|
return limiter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get existing rate limiter
|
||||||
|
*/
|
||||||
|
get(key: string): RateLimiter | undefined {
|
||||||
|
return this.limiters.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all registered rate limiters
|
||||||
|
*/
|
||||||
|
getAll(): Map<string, RateLimiter> {
|
||||||
|
return new Map(this.limiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get aggregated metrics across all limiters
|
||||||
|
*/
|
||||||
|
getAggregatedMetrics(): {
|
||||||
|
totalLimiters: number;
|
||||||
|
metrics: RateLimitMetrics;
|
||||||
|
byChannel: Record<string, RateLimitMetrics>;
|
||||||
|
} {
|
||||||
|
const byChannel: Record<string, RateLimitMetrics> = {};
|
||||||
|
const aggregated: RateLimitMetrics = {
|
||||||
|
totalRequests: 0,
|
||||||
|
allowedRequests: 0,
|
||||||
|
rejectedRequests: 0,
|
||||||
|
circuitRejections: 0,
|
||||||
|
circuitStateChanges: 0,
|
||||||
|
averageWaitMs: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
for (const [key, limiter] of this.limiters) {
|
||||||
|
const metrics = limiter.getMetrics();
|
||||||
|
const state = limiter.getState();
|
||||||
|
|
||||||
|
// Aggregate totals
|
||||||
|
aggregated.totalRequests += metrics.totalRequests;
|
||||||
|
aggregated.allowedRequests += metrics.allowedRequests;
|
||||||
|
aggregated.rejectedRequests += metrics.rejectedRequests;
|
||||||
|
aggregated.circuitRejections += metrics.circuitRejections;
|
||||||
|
aggregated.circuitStateChanges += metrics.circuitStateChanges;
|
||||||
|
|
||||||
|
// By channel
|
||||||
|
if (state.channel) {
|
||||||
|
if (!byChannel[state.channel]) {
|
||||||
|
byChannel[state.channel] = { ...metrics };
|
||||||
|
} else {
|
||||||
|
const channelMetrics = byChannel[state.channel]!;
|
||||||
|
channelMetrics.totalRequests += metrics.totalRequests;
|
||||||
|
channelMetrics.allowedRequests += metrics.allowedRequests;
|
||||||
|
channelMetrics.rejectedRequests += metrics.rejectedRequests;
|
||||||
|
channelMetrics.circuitRejections += metrics.circuitRejections;
|
||||||
|
channelMetrics.circuitStateChanges += metrics.circuitStateChanges;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate average wait
|
||||||
|
if (aggregated.rejectedRequests > 0) {
|
||||||
|
const totalWait = Array.from(this.limiters.values()).reduce(
|
||||||
|
(sum, limiter) =>
|
||||||
|
sum + limiter.getMetrics().averageWaitMs * limiter.getMetrics().rejectedRequests,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
aggregated.averageWaitMs = totalWait / aggregated.rejectedRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
totalLimiters: this.limiters.size,
|
||||||
|
metrics: aggregated,
|
||||||
|
byChannel,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear all rate limiters
|
||||||
|
*/
|
||||||
|
clear(): void {
|
||||||
|
this.limiters.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const rateLimiterRegistry = new RateLimiterRegistry();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper to create rate limiter options from common provider patterns
|
||||||
|
*/
|
||||||
|
export function createProviderRateLimiterOptions(params: {
|
||||||
|
provider: string;
|
||||||
|
accountId?: string;
|
||||||
|
/** Requests per second */
|
||||||
|
requestsPerSecond: number;
|
||||||
|
/** Burst size (optional, defaults to requestsPerSecond * 2) */
|
||||||
|
burstSize?: number;
|
||||||
|
/** Enable circuit breaker (default: true) */
|
||||||
|
circuitBreaker?: boolean | CircuitBreakerConfig;
|
||||||
|
}): RateLimiterOptions {
|
||||||
|
const identifier = params.accountId ? `${params.provider}:${params.accountId}` : params.provider;
|
||||||
|
|
||||||
|
return {
|
||||||
|
identifier,
|
||||||
|
channel: params.provider,
|
||||||
|
rateLimit: {
|
||||||
|
maxRequests: params.requestsPerSecond,
|
||||||
|
windowMs: 1000,
|
||||||
|
burstSize: params.burstSize ?? params.requestsPerSecond * 2,
|
||||||
|
refillRate: params.requestsPerSecond,
|
||||||
|
},
|
||||||
|
circuitBreaker:
|
||||||
|
params.circuitBreaker === false
|
||||||
|
? undefined
|
||||||
|
: typeof params.circuitBreaker === "object"
|
||||||
|
? params.circuitBreaker
|
||||||
|
: DEFAULT_CIRCUIT_BREAKER_CONFIG,
|
||||||
|
};
|
||||||
|
}
|
||||||
310
src/telegram/rate-limited-api.ts
Normal file
310
src/telegram/rate-limited-api.ts
Normal file
@ -0,0 +1,310 @@
|
|||||||
|
/**
|
||||||
|
* Rate-limited Telegram API wrapper
|
||||||
|
*
|
||||||
|
* Integrates the enhanced rate limiting system with Telegram Bot API calls
|
||||||
|
* to prevent rate limit errors proactively while maintaining throughput.
|
||||||
|
*
|
||||||
|
* @module telegram/rate-limited-api
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { Bot } from "grammy";
|
||||||
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
|
import {
|
||||||
|
RateLimiter,
|
||||||
|
createProviderRateLimiterOptions,
|
||||||
|
rateLimiterRegistry,
|
||||||
|
} from "../infra/rate-limiter.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("telegram/rate-limited");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Telegram rate limits (from Telegram Bot API documentation)
|
||||||
|
* - Global: 30 messages per second to all chats
|
||||||
|
* - Per-chat: 1 message per second for groups
|
||||||
|
* - DM: 1 message per second per user (can burst)
|
||||||
|
*/
|
||||||
|
const TELEGRAM_GLOBAL_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 30,
|
||||||
|
burstSize: 60, // Allow bursts
|
||||||
|
};
|
||||||
|
|
||||||
|
const TELEGRAM_CHAT_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 1,
|
||||||
|
burstSize: 3, // Small burst for groups
|
||||||
|
};
|
||||||
|
|
||||||
|
const TELEGRAM_DM_RATE_LIMIT = {
|
||||||
|
requestsPerSecond: 1,
|
||||||
|
burstSize: 10, // Higher burst for DMs
|
||||||
|
};
|
||||||
|
|
||||||
|
export type TelegramRateLimitedApiOptions = {
|
||||||
|
accountId: string;
|
||||||
|
api: Bot["api"];
|
||||||
|
/** Enable verbose logging */
|
||||||
|
verbose?: boolean;
|
||||||
|
/** Override default rate limits */
|
||||||
|
customLimits?: {
|
||||||
|
global?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
chat?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
dm?: { requestsPerSecond: number; burstSize?: number };
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rate-limited wrapper for Telegram Bot API
|
||||||
|
*/
|
||||||
|
export class TelegramRateLimitedApi {
|
||||||
|
private readonly accountId: string;
|
||||||
|
private readonly api: Bot["api"];
|
||||||
|
private readonly verbose: boolean;
|
||||||
|
private readonly globalLimiter: RateLimiter;
|
||||||
|
|
||||||
|
constructor(options: TelegramRateLimitedApiOptions) {
|
||||||
|
this.accountId = options.accountId;
|
||||||
|
this.api = options.api;
|
||||||
|
this.verbose = options.verbose ?? false;
|
||||||
|
|
||||||
|
// Create global rate limiter
|
||||||
|
const globalLimits = options.customLimits?.global ?? TELEGRAM_GLOBAL_RATE_LIMIT;
|
||||||
|
this.globalLimiter = rateLimiterRegistry.getOrCreate(
|
||||||
|
`telegram:${options.accountId}:global`,
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "telegram",
|
||||||
|
accountId: `${options.accountId}:global`,
|
||||||
|
requestsPerSecond: globalLimits.requestsPerSecond,
|
||||||
|
burstSize: globalLimits.burstSize,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a Telegram API request with rate limiting
|
||||||
|
* @param fn The API call to execute
|
||||||
|
* @param context Context for logging and rate limit selection
|
||||||
|
*/
|
||||||
|
async executeWithRateLimit<T>(
|
||||||
|
fn: () => Promise<T>,
|
||||||
|
context: {
|
||||||
|
method: string;
|
||||||
|
chatId?: string | number;
|
||||||
|
isGroup?: boolean;
|
||||||
|
},
|
||||||
|
): Promise<T> {
|
||||||
|
const limiter = this.selectLimiter(context);
|
||||||
|
|
||||||
|
// Try to acquire rate limit token
|
||||||
|
const result = limiter.tryAcquire();
|
||||||
|
|
||||||
|
if (!result.allowed) {
|
||||||
|
if (this.verbose) {
|
||||||
|
log.warn(
|
||||||
|
`[telegram:${this.accountId}] Rate limit hit for ${context.method}. ` +
|
||||||
|
`Circuit: ${result.circuitState}, Retry after: ${result.retryAfter}ms`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for rate limit to clear
|
||||||
|
const acquired = await limiter.waitAndAcquire();
|
||||||
|
if (!acquired) {
|
||||||
|
throw new Error(
|
||||||
|
`Telegram rate limit timeout for ${context.method} (circuit: ${result.circuitState})`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fn();
|
||||||
|
limiter.recordSuccess();
|
||||||
|
return response;
|
||||||
|
} catch (err) {
|
||||||
|
limiter.recordFailure();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Select appropriate rate limiter based on context
|
||||||
|
*/
|
||||||
|
private selectLimiter(context: {
|
||||||
|
method: string;
|
||||||
|
chatId?: string | number;
|
||||||
|
isGroup?: boolean;
|
||||||
|
}): RateLimiter {
|
||||||
|
// Chat-specific limiter
|
||||||
|
if (context.chatId) {
|
||||||
|
const chatKey = String(context.chatId);
|
||||||
|
const isGroup = context.isGroup ?? chatKey.startsWith("-");
|
||||||
|
const limiterType = isGroup ? "group" : "dm";
|
||||||
|
const key = `telegram:${this.accountId}:${limiterType}:${chatKey}`;
|
||||||
|
|
||||||
|
const limits = isGroup ? TELEGRAM_CHAT_RATE_LIMIT : TELEGRAM_DM_RATE_LIMIT;
|
||||||
|
|
||||||
|
return rateLimiterRegistry.getOrCreate(
|
||||||
|
key,
|
||||||
|
createProviderRateLimiterOptions({
|
||||||
|
provider: "telegram",
|
||||||
|
accountId: `${this.accountId}:${limiterType}:${chatKey}`,
|
||||||
|
requestsPerSecond: limits.requestsPerSecond,
|
||||||
|
burstSize: limits.burstSize,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to global limiter
|
||||||
|
return this.globalLimiter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current rate limiting state
|
||||||
|
*/
|
||||||
|
getState(): {
|
||||||
|
accountId: string;
|
||||||
|
global: ReturnType<RateLimiter["getState"]>;
|
||||||
|
limiters: Map<string, RateLimiter>;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
accountId: this.accountId,
|
||||||
|
global: this.globalLimiter.getState(),
|
||||||
|
limiters: rateLimiterRegistry.getAll(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get metrics for all Telegram rate limiters
|
||||||
|
*/
|
||||||
|
getMetrics(): ReturnType<typeof rateLimiterRegistry.getAggregatedMetrics> {
|
||||||
|
return rateLimiterRegistry.getAggregatedMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a rate-limited Telegram Bot API wrapper
|
||||||
|
*/
|
||||||
|
export function createRateLimitedTelegramApi(options: TelegramRateLimitedApiOptions) {
|
||||||
|
const rateLimited = new TelegramRateLimitedApi(options);
|
||||||
|
|
||||||
|
return {
|
||||||
|
/**
|
||||||
|
* Send a text message with rate limiting
|
||||||
|
*/
|
||||||
|
async sendMessage(params: {
|
||||||
|
chatId: string | number;
|
||||||
|
text: string;
|
||||||
|
isGroup?: boolean;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}): Promise<unknown> {
|
||||||
|
return rateLimited.executeWithRateLimit(
|
||||||
|
() => options.api.sendMessage(params.chatId, params.text, params as any),
|
||||||
|
{
|
||||||
|
method: "sendMessage",
|
||||||
|
chatId: params.chatId,
|
||||||
|
isGroup: params.isGroup,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a photo with rate limiting
|
||||||
|
*/
|
||||||
|
async sendPhoto(params: {
|
||||||
|
chatId: string | number;
|
||||||
|
photo: string | { source: Buffer };
|
||||||
|
isGroup?: boolean;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}): Promise<unknown> {
|
||||||
|
return rateLimited.executeWithRateLimit(
|
||||||
|
() => options.api.sendPhoto(params.chatId, params.photo, params as any),
|
||||||
|
{
|
||||||
|
method: "sendPhoto",
|
||||||
|
chatId: params.chatId,
|
||||||
|
isGroup: params.isGroup,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a document with rate limiting
|
||||||
|
*/
|
||||||
|
async sendDocument(params: {
|
||||||
|
chatId: string | number;
|
||||||
|
document: string | { source: Buffer };
|
||||||
|
isGroup?: boolean;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}): Promise<unknown> {
|
||||||
|
return rateLimited.executeWithRateLimit(
|
||||||
|
() => options.api.sendDocument(params.chatId, params.document, params as any),
|
||||||
|
{
|
||||||
|
method: "sendDocument",
|
||||||
|
chatId: params.chatId,
|
||||||
|
isGroup: params.isGroup,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Edit a message with rate limiting
|
||||||
|
*/
|
||||||
|
async editMessageText(params: {
|
||||||
|
chatId: string | number;
|
||||||
|
messageId: number;
|
||||||
|
text: string;
|
||||||
|
isGroup?: boolean;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}): Promise<unknown> {
|
||||||
|
return rateLimited.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.api.editMessageText(params.chatId, params.messageId, params.text, params as any),
|
||||||
|
{
|
||||||
|
method: "editMessageText",
|
||||||
|
chatId: params.chatId,
|
||||||
|
isGroup: params.isGroup,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a reaction with rate limiting
|
||||||
|
*/
|
||||||
|
async setMessageReaction(params: {
|
||||||
|
chatId: string | number;
|
||||||
|
messageId: number;
|
||||||
|
reaction?: Array<{ type: string; emoji: string }>;
|
||||||
|
isGroup?: boolean;
|
||||||
|
}): Promise<boolean> {
|
||||||
|
return rateLimited.executeWithRateLimit(
|
||||||
|
() =>
|
||||||
|
options.api.setMessageReaction(params.chatId, params.messageId, params.reaction, {
|
||||||
|
is_big: false,
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
method: "setMessageReaction",
|
||||||
|
chatId: params.chatId,
|
||||||
|
isGroup: params.isGroup,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get raw API instance (without rate limiting)
|
||||||
|
* Use with caution - bypasses rate limiting
|
||||||
|
*/
|
||||||
|
getRawApi(): Bot["api"] {
|
||||||
|
return options.api;
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current rate limiting state
|
||||||
|
*/
|
||||||
|
getState() {
|
||||||
|
return rateLimited.getState();
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get rate limiting metrics
|
||||||
|
*/
|
||||||
|
getMetrics() {
|
||||||
|
return rateLimited.getMetrics();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user