From 9324e37ae65bb92314170c61a2291d580312d2aa Mon Sep 17 00:00:00 2001 From: Niranjan Akella Date: Tue, 27 Jan 2026 15:31:55 +0530 Subject: [PATCH] feat: qdrant vector DB implemenation --- docs/concepts/memory.md | 87 +++++++- package.json | 1 + pnpm-lock.yaml | 33 +++ src/agents/memory-search.ts | 13 +- src/config/types.tools.ts | 16 +- src/config/zod-schema.agent-runtime.ts | 17 +- src/memory/manager.ts | 265 ++++++++++------------- src/memory/vector-store-qdrant.ts | 282 +++++++++++++++++++++++++ src/memory/vector-store.ts | 198 +++++++++++++++++ 9 files changed, 753 insertions(+), 159 deletions(-) create mode 100644 src/memory/vector-store-qdrant.ts create mode 100644 src/memory/vector-store.ts diff --git a/docs/concepts/memory.md b/docs/concepts/memory.md index 3d9996593..67c817342 100644 --- a/docs/concepts/memory.md +++ b/docs/concepts/memory.md @@ -87,7 +87,7 @@ Defaults: 3. `gemini` if a Gemini key can be resolved. 4. Otherwise memory search stays disabled until configured. - Local mode uses node-llama-cpp and may require `pnpm approve-builds`. -- Uses sqlite-vec (when available) to accelerate vector search inside SQLite. +- Uses sqlite-vec (when available) or Qdrant to accelerate vector search. Remote embeddings **require** an API key for the embedding provider. Clawdbot resolves keys from auth profiles, `models.providers.*.apiKey`, or environment @@ -355,6 +355,91 @@ Notes: - `extensionPath` overrides the bundled sqlite-vec path (useful for custom builds or non-standard install locations). +### Qdrant vector database + +Clawdbot can use Qdrant as an alternative vector database backend, supporting both local (on-disk or in-memory) and cloud deployments. + +**Local Qdrant server:** + +```json5 +agents: { + defaults: { + memorySearch: { + store: { + driver: "qdrant", + qdrant: { + url: "http://localhost:6333", // Default Qdrant REST API port + collection: { + onDisk: true, // Store vectors on disk (default: true) + distance: "Cosine" // Distance metric: "Cosine", "Euclidean", or "Dot" + } + } + } + } + } +} +``` + +**Cloud Qdrant:** + +```json5 +agents: { + defaults: { + memorySearch: { + store: { + driver: "qdrant", + qdrant: { + url: "https://your-cluster.qdrant.io", + apiKey: "your-api-key", + collection: { + onDisk: false, // Cloud handles persistence + distance: "Cosine" + } + } + } + } + } +} +``` + +**In-memory Qdrant (for testing):** + +```json5 +agents: { + defaults: { + memorySearch: { + store: { + driver: "qdrant", + qdrant: { + url: "http://localhost:6333", + collection: { + onDisk: false, // In-memory only + distance: "Cosine" + } + } + } + } + } +} +``` + +**Running Qdrant locally with Docker:** + +```bash +docker pull qdrant/qdrant +docker run -p 6333:6333 -p 6334:6334 \ + -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \ + qdrant/qdrant +``` + +Notes: +- Collection names are auto-generated per agent and model to ensure isolation. +- SQLite is still used for metadata (files, chunks tables) regardless of vector driver. +- Embedding cache uses SQLite regardless of vector driver. +- Hybrid search (BM25 + vector) works with Qdrant same as sqlite-vec. +- If Qdrant connection fails, Clawdbot falls back to in-process cosine similarity. +- Default driver is `"sqlite"` for backward compatibility. + ### Local embedding auto-download - Default local embedding model: `hf:ggml-org/embeddinggemma-300M-GGUF/embeddinggemma-300M-Q8_0.gguf` (~0.6 GB). diff --git a/package.json b/package.json index 6a88df982..2924af5cb 100644 --- a/package.json +++ b/package.json @@ -167,6 +167,7 @@ "@mariozechner/pi-coding-agent": "0.49.3", "@mariozechner/pi-tui": "0.49.3", "@mozilla/readability": "^0.6.0", + "@qdrant/qdrant-js": "^1.14.0", "@sinclair/typebox": "0.34.47", "@slack/bolt": "^4.6.0", "@slack/web-api": "^7.13.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d1c55dd8d..fbdcaa920 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1481,6 +1481,23 @@ packages: resolution: {integrity: sha512-juG5VWh4qAivzTAeMzvY9xs9HY5rAcr2E4I7tiSSCokRFi7XIZCAu92ZkSTsIj1OPceCifL3cpfteP3pDT9/QQ==} engines: {node: '>=14.0.0'} + '@qdrant/js-client-grpc@1.14.0': + resolution: {integrity: sha512-A1464Ub25+jVvqOKddemTZMP377LyilGrSHoWVaTT9FtieXXr0trIfJLL1UWExgZ6hhopaURVqcc8uK3OAsP1w==} + engines: {node: '>=18.0.0', pnpm: '>=8'} + + '@qdrant/js-client-rest@1.14.0': + resolution: {integrity: sha512-2sM2g17FSkN2sNCSeAfqxHRr+SPEVnUQLXBjVv/whm4YQ4JjZ53Jiy1iShk95G+xBf3hKBhJdj8itRnor03IYw==} + engines: {node: '>=18.0.0', pnpm: '>=8'} + + '@qdrant/qdrant-js@1.14.0': + resolution: {integrity: sha512-odY6jC/yKwE5uoyFhIN4J9EEyS6aVsGtrSdMFD0DuqMGW5ZvuvGcDNw3HLkN71JT0D0gQEyHO7/sokuG/GxkwA==} + engines: {node: '>=18.0.0', pnpm: '>=8'} + peerDependencies: + typescript: '>=4.1' + dependencies: + '@qdrant/js-client-grpc': 1.14.0 + '@qdrant/js-client-rest': 1.14.0 + '@napi-rs/canvas-android-arm64@0.1.88': resolution: {integrity: sha512-KEaClPnZuVxJ8smUWjV1wWFkByBO/D+vy4lN+Dm5DFH514oqwukxKGeck9xcKJhaWJGjfruGmYGiwRe//+/zQQ==} engines: {node: '>= 10'} @@ -7106,6 +7123,12 @@ snapshots: '@mozilla/readability@0.6.0': {} + '@qdrant/js-client-grpc@1.14.0': {} + + '@qdrant/js-client-rest@1.14.0': {} + + '@qdrant/qdrant-js@1.14.0': {} + '@napi-rs/canvas-android-arm64@0.1.88': optional: true @@ -7911,6 +7934,15 @@ snapshots: '@sinclair/typebox@0.34.47': {} + '@qdrant/js-client-grpc@1.14.0': {} + + '@qdrant/js-client-rest@1.14.0': {} + + '@qdrant/qdrant-js@1.14.0': + dependencies: + '@qdrant/js-client-grpc': 1.14.0 + '@qdrant/js-client-rest': 1.14.0 + '@slack/bolt@4.6.0(@types/express@5.0.6)': dependencies: '@slack/logger': 4.0.0 @@ -9108,6 +9140,7 @@ snapshots: '@mariozechner/pi-coding-agent': 0.49.3(ws@8.19.0)(zod@4.3.6) '@mariozechner/pi-tui': 0.49.3 '@mozilla/readability': 0.6.0 + '@qdrant/qdrant-js': 1.14.0 '@sinclair/typebox': 0.34.47 '@slack/bolt': 4.6.0(@types/express@5.0.6) '@slack/web-api': 7.13.0 diff --git a/src/agents/memory-search.ts b/src/agents/memory-search.ts index 9eb35f3ee..5c61efb13 100644 --- a/src/agents/memory-search.ts +++ b/src/agents/memory-search.ts @@ -32,12 +32,21 @@ export type ResolvedMemorySearchConfig = { modelCacheDir?: string; }; store: { - driver: "sqlite"; + driver: "sqlite" | "qdrant"; path: string; vector: { enabled: boolean; extensionPath?: string; }; + qdrant?: { + url?: string; + apiKey?: string; + collection?: { + name?: string; + onDisk?: boolean; + distance?: "Cosine" | "Euclidean" | "Dot"; + }; + }; }; chunking: { tokens: number; @@ -167,10 +176,12 @@ function mergeConfig( extensionPath: overrides?.store?.vector?.extensionPath ?? defaults?.store?.vector?.extensionPath, }; + const qdrant = overrides?.store?.qdrant ?? defaults?.store?.qdrant; const store = { driver: overrides?.store?.driver ?? defaults?.store?.driver ?? "sqlite", path: resolveStorePath(agentId, overrides?.store?.path ?? defaults?.store?.path), vector, + ...(qdrant ? { qdrant } : {}), }; const chunking = { tokens: overrides?.chunking?.tokens ?? defaults?.chunking?.tokens ?? DEFAULT_CHUNK_TOKENS, diff --git a/src/config/types.tools.ts b/src/config/types.tools.ts index bb1d45bf0..96afc5383 100644 --- a/src/config/types.tools.ts +++ b/src/config/types.tools.ts @@ -263,7 +263,7 @@ export type MemorySearchConfig = { }; /** Index storage configuration. */ store?: { - driver?: "sqlite"; + driver?: "sqlite" | "qdrant"; path?: string; vector?: { /** Enable sqlite-vec extension for vector search (default: true). */ @@ -271,6 +271,20 @@ export type MemorySearchConfig = { /** Optional override path to sqlite-vec extension (.dylib/.so/.dll). */ extensionPath?: string; }; + qdrant?: { + /** Qdrant server URL (default: "http://localhost:6333"). */ + url?: string; + /** API key for cloud Qdrant instances. */ + apiKey?: string; + collection?: { + /** Collection name (default: auto-generated per agent/model). */ + name?: string; + /** Store vectors on disk (default: true for local, false for in-memory). */ + onDisk?: boolean; + /** Distance metric (default: "Cosine"). */ + distance?: "Cosine" | "Euclidean" | "Dot"; + }; + }; cache?: { /** Enable embedding cache (default: true). */ enabled?: boolean; diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index 7a63e307d..638d5bc21 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -342,7 +342,7 @@ export const MemorySearchSchema = z .optional(), store: z .object({ - driver: z.literal("sqlite").optional(), + driver: z.enum(["sqlite", "qdrant"]).optional(), path: z.string().optional(), vector: z .object({ @@ -351,6 +351,21 @@ export const MemorySearchSchema = z }) .strict() .optional(), + qdrant: z + .object({ + url: z.string().url().optional(), + apiKey: z.string().optional(), + collection: z + .object({ + name: z.string().optional(), + onDisk: z.boolean().optional(), + distance: z.enum(["Cosine", "Euclidean", "Dot"]).optional(), + }) + .strict() + .optional(), + }) + .strict() + .optional(), }) .strict() .optional(), diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 2134bb47e..d9d45af7d 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -12,7 +12,7 @@ import type { ClawdbotConfig } from "../config/config.js"; import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; -import { resolveUserPath } from "../utils.js"; +import { resolveUserPath, truncateUtf16Safe } from "../utils.js"; import { createEmbeddingProvider, type EmbeddingProvider, @@ -41,10 +41,11 @@ import { parseEmbedding, } from "./internal.js"; import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"; -import { searchKeyword, searchVector } from "./manager-search.js"; +import { searchKeyword, listChunks } from "./manager-search.js"; import { ensureMemoryIndexSchema } from "./memory-schema.js"; import { requireNodeSqlite } from "./sqlite.js"; -import { loadSqliteVecExtension } from "./sqlite-vec.js"; +import { QdrantStore } from "./vector-store-qdrant.js"; +import { SqliteVecStore, type VectorStore } from "./vector-store.js"; type MemorySource = "memory" | "sessions"; @@ -90,7 +91,6 @@ type MemorySyncProgressState = { const META_KEY = "memory_index_meta_v1"; const SNIPPET_MAX_CHARS = 700; -const VECTOR_TABLE = "chunks_vec"; const FTS_TABLE = "chunks_fts"; const EMBEDDING_CACHE_TABLE = "embedding_cache"; const SESSION_DIRTY_DEBOUNCE_MS = 5000; @@ -112,9 +112,6 @@ const log = createSubsystemLogger("memory"); const INDEX_CACHE = new Map(); -const vectorToBlob = (embedding: number[]): Buffer => - Buffer.from(new Float32Array(embedding).buffer); - export class MemoryIndexManager { private readonly cacheKey: string; private readonly cfg: ClawdbotConfig; @@ -142,19 +139,12 @@ export class MemoryIndexManager { private readonly sources: Set; private providerKey: string; private readonly cache: { enabled: boolean; maxEntries?: number }; - private readonly vector: { - enabled: boolean; - available: boolean | null; - extensionPath?: string; - loadError?: string; - dims?: number; - }; + private readonly vectorStore: VectorStore; private readonly fts: { enabled: boolean; available: boolean; loadError?: string; }; - private vectorReady: Promise | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; private sessionWatchTimer: NodeJS.Timeout | null = null; @@ -232,14 +222,19 @@ export class MemoryIndexManager { }; this.fts = { enabled: params.settings.query.hybrid.enabled, available: false }; this.ensureSchema(); - this.vector = { - enabled: params.settings.store.vector.enabled, - available: null, - extensionPath: params.settings.store.vector.extensionPath, - }; - const meta = this.readMeta(); - if (meta?.vectorDims) { - this.vector.dims = meta.vectorDims; + // Initialize vector store after providerKey is computed + if (params.settings.store.driver === "qdrant") { + this.vectorStore = new QdrantStore({ + agentId: this.agentId, + providerKey: this.providerKey, + config: params.settings.store.qdrant ?? {}, + }); + } else { + this.vectorStore = new SqliteVecStore({ + db: this.db, + enabled: params.settings.store.vector.enabled, + extensionPath: params.settings.store.vector.extensionPath, + }); } this.ensureWatcher(); this.ensureSessionListener(); @@ -310,18 +305,87 @@ export class MemoryIndexManager { queryVec: number[], limit: number, ): Promise> { - const results = await searchVector({ - db: this.db, - vectorTable: VECTOR_TABLE, - providerModel: this.provider.model, + const sources = Array.from(this.sources); + const vectorResults = await this.vectorStore.search( queryVec, limit, - snippetMaxChars: SNIPPET_MAX_CHARS, - ensureVectorReady: async (dimensions) => await this.ensureVectorReady(dimensions), - sourceFilterVec: this.buildSourceFilter("c"), - sourceFilterChunks: this.buildSourceFilter(), + { model: this.provider.model, sources }, + this.db, + ); + + if (vectorResults.length === 0) { + // Fallback to in-process cosine similarity + return this.searchVectorFallback(queryVec, limit); + } + + // Fetch chunk details from SQLite + const ids = vectorResults.map((r) => r.id); + const placeholders = ids.map(() => "?").join(", "); + const rows = this.db + .prepare( + `SELECT id, path, start_line, end_line, text, source\n` + + ` FROM chunks\n` + + ` WHERE id IN (${placeholders})`, + ) + .all(...ids) as Array<{ + id: string; + path: string; + start_line: number; + end_line: number; + text: string; + source: MemorySource; + }>; + + const byId = new Map(rows.map((r) => [r.id, r])); + return vectorResults + .map((result) => { + const chunk = byId.get(result.id); + if (!chunk) return null; + return { + id: result.id, + path: chunk.path, + startLine: chunk.start_line, + endLine: chunk.end_line, + score: result.score, + snippet: truncateUtf16Safe(chunk.text, SNIPPET_MAX_CHARS), + source: chunk.source, + } as MemorySearchResult & { id: string }; + }) + .filter((r): r is MemorySearchResult & { id: string } => r !== null); + } + + private searchVectorFallback( + queryVec: number[], + limit: number, + ): Array { + // Fallback to in-process cosine similarity when vector store unavailable + const { listChunks } = await import("./manager-search.js"); + const { cosineSimilarity } = await import("./internal.js"); + const sources = Array.from(this.sources); + const sourceFilter = this.buildSourceFilter(); + const candidates = listChunks({ + db: this.db, + providerModel: this.provider.model, + sourceFilter, }); - return results.map((entry) => entry as MemorySearchResult & { id: string }); + const scored = candidates + .map((chunk) => ({ + chunk, + score: cosineSimilarity(queryVec, chunk.embedding), + })) + .filter((entry) => Number.isFinite(entry.score)); + return scored + .sort((a, b) => b.score - a.score) + .slice(0, limit) + .map((entry) => ({ + id: entry.chunk.id, + path: entry.chunk.path, + startLine: entry.chunk.startLine, + endLine: entry.chunk.endLine, + score: entry.score, + snippet: truncateUtf16Safe(entry.chunk.text, SNIPPET_MAX_CHARS), + source: entry.chunk.source, + })); } private buildFtsQuery(raw: string): string | null { @@ -520,11 +584,8 @@ export class MemoryIndexManager { ? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason } : undefined, vector: { - enabled: this.vector.enabled, - available: this.vector.available ?? undefined, - extensionPath: this.vector.extensionPath, - loadError: this.vector.loadError, - dims: this.vector.dims, + driver: this.settings.store.driver, + available: await this.probeVectorAvailability(), }, batch: { enabled: this.batch.enabled, @@ -541,7 +602,6 @@ export class MemoryIndexManager { } async probeVectorAvailability(): Promise { - if (!this.vector.enabled) return false; return this.ensureVectorReady(); } @@ -582,78 +642,6 @@ export class MemoryIndexManager { INDEX_CACHE.delete(this.cacheKey); } - private async ensureVectorReady(dimensions?: number): Promise { - if (!this.vector.enabled) return false; - if (!this.vectorReady) { - this.vectorReady = this.withTimeout( - this.loadVectorExtension(), - VECTOR_LOAD_TIMEOUT_MS, - `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, - ); - } - let ready = false; - try { - ready = await this.vectorReady; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - this.vectorReady = null; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - if (ready && typeof dimensions === "number" && dimensions > 0) { - this.ensureVectorTable(dimensions); - } - return ready; - } - - private async loadVectorExtension(): Promise { - if (this.vector.available !== null) return this.vector.available; - if (!this.vector.enabled) { - this.vector.available = false; - return false; - } - try { - const resolvedPath = this.vector.extensionPath?.trim() - ? resolveUserPath(this.vector.extensionPath) - : undefined; - const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); - if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error"); - this.vector.extensionPath = loaded.extensionPath; - this.vector.available = true; - return true; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - } - - private ensureVectorTable(dimensions: number): void { - if (this.vector.dims === dimensions) return; - if (this.vector.dims && this.vector.dims !== dimensions) { - this.dropVectorTable(); - } - this.db.exec( - `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + - ` id TEXT PRIMARY KEY,\n` + - ` embedding FLOAT[${dimensions}]\n` + - `)`, - ); - this.vector.dims = dimensions; - } - - private dropVectorTable(): void { - try { - this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); - } - } private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { const sources = Array.from(this.sources); @@ -672,7 +660,8 @@ export class MemoryIndexManager { const dir = path.dirname(dbPath); ensureDir(dir); const { DatabaseSync } = requireNodeSqlite(); - return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); + const allowExtension = this.settings.store.driver === "sqlite" && this.settings.store.vector.enabled; + return new DatabaseSync(dbPath, { allowExtension }); } private seedEmbeddingCache(sourceDb: DatabaseSync): void { @@ -1027,11 +1016,7 @@ export class MemoryIndexManager { if (activePaths.has(stale.path)) continue; this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "memory"); + await this.vectorStore.deleteByPath(stale.path, "memory", this.db); } catch {} this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); if (this.fts.enabled && this.fts.available) { @@ -1124,11 +1109,7 @@ export class MemoryIndexManager { .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "sessions"); + await this.vectorStore.deleteByPath(stale.path, "sessions", this.db); } catch {} this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) @@ -1179,7 +1160,6 @@ export class MemoryIndexManager { label: "Loading vector extension…", }); } - const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); const needsFullReindex = params?.force || @@ -1188,8 +1168,7 @@ export class MemoryIndexManager { meta.provider !== this.provider.id || meta.providerKey !== this.providerKey || meta.chunkTokens !== this.settings.chunking.tokens || - meta.chunkOverlap !== this.settings.chunking.overlap || - (vectorReady && !meta?.vectorDims); + meta.chunkOverlap !== this.settings.chunking.overlap; try { if (needsFullReindex) { await this.runSafeReindex({ @@ -1308,10 +1287,6 @@ export class MemoryIndexManager { const originalState = { ftsAvailable: this.fts.available, ftsError: this.fts.loadError, - vectorAvailable: this.vector.available, - vectorLoadError: this.vector.loadError, - vectorDims: this.vector.dims, - vectorReady: this.vectorReady, }; const restoreOriginalState = () => { @@ -1322,17 +1297,9 @@ export class MemoryIndexManager { } this.fts.available = originalState.ftsAvailable; this.fts.loadError = originalState.ftsError; - this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; - this.vector.loadError = originalState.vectorLoadError; - this.vector.dims = originalState.vectorDims; - this.vectorReady = originalDbClosed ? null : originalState.vectorReady; }; this.db = tempDb; - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.vector.dims = undefined; this.fts.available = false; this.fts.loadError = undefined; this.ensureSchema(); @@ -1369,9 +1336,7 @@ export class MemoryIndexManager { chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, }; - if (this.vector.available && this.vector.dims) { - nextMeta.vectorDims = this.vector.dims; - } + // Vector dimensions are managed by the vector store this.writeMeta(nextMeta); this.pruneEmbeddingCacheIfNeeded(); @@ -1383,11 +1348,7 @@ export class MemoryIndexManager { await this.swapIndexFiles(dbPath, tempDbPath); this.db = this.openDatabaseAtPath(dbPath); - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; this.ensureSchema(); - this.vector.dims = nextMeta.vectorDims; } catch (err) { try { this.db.close(); @@ -1406,8 +1367,7 @@ export class MemoryIndexManager { this.db.exec(`DELETE FROM ${FTS_TABLE}`); } catch {} } - this.dropVectorTable(); - this.vector.dims = undefined; + // Vector store cleanup is handled by the store implementation this.sessionsDirtyFiles.clear(); } @@ -2093,11 +2053,7 @@ export class MemoryIndexManager { const now = Date.now(); if (vectorReady) { try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(entry.path, options.source); + await this.vectorStore.deleteByPath(entry.path, options.source, this.db); } catch {} } if (this.fts.enabled && this.fts.available) { @@ -2141,11 +2097,10 @@ export class MemoryIndexManager { ); if (vectorReady && embedding.length > 0) { try { - this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); - } catch {} - this.db - .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) - .run(id, vectorToBlob(embedding)); + await this.vectorStore.upsert(id, embedding); + } catch (err) { + log.debug(`Vector upsert failed for ${id}: ${String(err)}`); + } } if (this.fts.enabled && this.fts.available) { this.db diff --git a/src/memory/vector-store-qdrant.ts b/src/memory/vector-store-qdrant.ts new file mode 100644 index 000000000..b9f84f50d --- /dev/null +++ b/src/memory/vector-store-qdrant.ts @@ -0,0 +1,282 @@ +import type { DatabaseSync } from "node:sqlite"; +import { QdrantClient } from "@qdrant/qdrant-js"; + +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { hashText } from "./internal.js"; +import type { VectorSearchResult, VectorStore } from "./vector-store.js"; + +const log = createSubsystemLogger("memory"); + +export type QdrantConfig = { + url?: string; + apiKey?: string; + collection?: { + name?: string; + onDisk?: boolean; + distance?: "Cosine" | "Euclidean" | "Dot"; + }; +}; + +export class QdrantStore implements VectorStore { + private client: QdrantClient | null = null; + private collectionName: string; + private config: Required; + private available: boolean | null = null; + private loadError?: string; + private dims?: number; + private ready: Promise | null = null; + private readonly VECTOR_LOAD_TIMEOUT_MS = 30_000; + + constructor( + params: { + agentId: string; + providerKey: string; + config: QdrantConfig; + }, + ) { + this.config = { + url: params.config.url ?? "http://localhost:6333", + apiKey: params.config.apiKey, + collection: { + name: + params.config.collection?.name ?? + `clawdbot_${params.agentId}_${hashText(params.providerKey).slice(0, 16)}`, + onDisk: params.config.collection?.onDisk ?? true, + distance: params.config.collection?.distance ?? "Cosine", + }, + }; + this.collectionName = this.config.collection.name; + } + + async ensureReady(dimensions: number): Promise { + if (!this.ready) { + this.ready = this.withTimeout( + this.initialize(), + this.VECTOR_LOAD_TIMEOUT_MS, + `Qdrant initialization timed out after ${Math.round(this.VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, + ); + } + let ready = false; + try { + ready = await this.ready; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.available = false; + this.loadError = message; + this.ready = null; + log.warn(`Qdrant unavailable: ${message}`); + return false; + } + if (ready && typeof dimensions === "number" && dimensions > 0) { + await this.ensureCollection(dimensions); + } + return ready; + } + + async upsert(id: string, embedding: number[]): Promise { + if (!this.client || !this.available || !this.dims) { + throw new Error("Qdrant store not ready"); + } + try { + await this.client.upsertPoints(this.collectionName, { + points: [ + { + id: this.stringToPointId(id), + vector: embedding, + }, + ], + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.warn(`Qdrant upsert failed: ${message}`); + throw err; + } + } + + async delete(id: string): Promise { + if (!this.client || !this.available) return; + try { + await this.client.deletePoints(this.collectionName, { + points: [this.stringToPointId(id)], + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Qdrant delete failed: ${message}`); + } + } + + async deleteByPath(path: string, source: string, db: DatabaseSync): Promise { + if (!this.client || !this.available) return; + try { + // Get all chunk IDs for this path/source from SQLite + const rows = db + .prepare(`SELECT id FROM chunks WHERE path = ? AND source = ?`) + .all(path, source) as Array<{ id: string }>; + + if (rows.length === 0) return; + + const pointIds = rows.map((row) => this.stringToPointId(row.id)); + await this.client.deletePoints(this.collectionName, { + points: pointIds, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Qdrant deleteByPath failed: ${message}`); + } + } + + async search( + queryVec: number[], + limit: number, + filter: { model: string; sources: string[] }, + db: DatabaseSync, + ): Promise { + if (!this.client || !this.available || queryVec.length === 0 || limit <= 0) return []; + if (!(await this.ensureReady(queryVec.length))) return []; + + try { + // Build filter to match model and sources + // We need to get chunk IDs from SQLite that match the filter, then search Qdrant + // This is a limitation: Qdrant doesn't store model/source metadata, so we filter in SQLite first + const sourceFilter = + filter.sources.length === 0 + ? { sql: "", params: [] } + : { + sql: ` AND source IN (${filter.sources.map(() => "?").join(", ")})`, + params: filter.sources, + }; + + const chunkRows = db + .prepare( + `SELECT id FROM chunks WHERE model = ?${sourceFilter.sql}`, + ) + .all(filter.model, ...sourceFilter.params) as Array<{ id: string }>; + + if (chunkRows.length === 0) return []; + + const pointIds = chunkRows.map((row) => this.stringToPointId(row.id)); + + const results = await this.client.searchPoints(this.collectionName, { + vector: queryVec, + limit, + filter: { + must: [ + { + has_id: pointIds, + }, + ], + }, + }); + + return results.points.map((point) => { + const id = this.pointIdToString(point.id); + // Convert distance to similarity score + // Qdrant returns distance (lower is better for Cosine/Euclidean) + // For Cosine: similarity = 1 - distance (when distance is normalized 0-1) + // For Dot: score is already similarity + let score = 0; + if (point.score !== undefined) { + if (this.config.collection.distance === "Cosine") { + score = 1 - point.score; + } else if (this.config.collection.distance === "Dot") { + score = point.score; + } else { + // Euclidean: convert to similarity using 1 / (1 + distance) + score = 1 / (1 + point.score); + } + } + return { id, score }; + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.warn(`Qdrant search failed: ${message}`); + return []; + } + } + + private async initialize(): Promise { + if (this.available !== null) return this.available; + try { + const clientConfig: { url: string; apiKey?: string } = { + url: this.config.url, + }; + if (this.config.apiKey) { + clientConfig.apiKey = this.config.apiKey; + } + this.client = new QdrantClient(clientConfig); + + // Test connection + await this.client.getCollections(); + this.available = true; + return true; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.available = false; + this.loadError = message; + log.warn(`Qdrant connection failed: ${message}`); + return false; + } + } + + private async ensureCollection(dimensions: number): Promise { + if (!this.client || this.dims === dimensions) return; + if (this.dims && this.dims !== dimensions) { + await this.dropCollection(); + } + + try { + const collections = await this.client.getCollections(); + const exists = collections.collections.some((c) => c.name === this.collectionName); + + if (!exists) { + await this.client.createCollection(this.collectionName, { + vectors: { + size: dimensions, + distance: this.config.collection.distance, + on_disk: this.config.collection.onDisk, + }, + }); + log.debug(`Created Qdrant collection: ${this.collectionName}`); + } + this.dims = dimensions; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.warn(`Failed to ensure Qdrant collection: ${message}`); + throw err; + } + } + + private async dropCollection(): Promise { + if (!this.client) return; + try { + await this.client.deleteCollection(this.collectionName); + log.debug(`Dropped Qdrant collection: ${this.collectionName}`); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Failed to drop Qdrant collection: ${message}`); + } + } + + private stringToPointId(str: string): string | number { + // Qdrant supports string or integer IDs + // Use string IDs to match our chunk IDs + return str; + } + + private pointIdToString(id: string | number): string { + return String(id); + } + + private async withTimeout( + promise: Promise, + timeoutMs: number, + timeoutMessage: string, + ): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs), + ), + ]); + } +} diff --git a/src/memory/vector-store.ts b/src/memory/vector-store.ts new file mode 100644 index 000000000..d0a15f71f --- /dev/null +++ b/src/memory/vector-store.ts @@ -0,0 +1,198 @@ +import type { DatabaseSync } from "node:sqlite"; + +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { loadSqliteVecExtension } from "./sqlite-vec.js"; + +const log = createSubsystemLogger("memory"); + +export type VectorSearchResult = { + id: string; + score: number; +}; + +export interface VectorStore { + ensureReady(dimensions: number): Promise; + upsert(id: string, embedding: number[]): Promise; + delete(id: string): Promise; + deleteByPath(path: string, source: string, db: DatabaseSync): Promise; + search( + queryVec: number[], + limit: number, + filter: { model: string; sources: string[] }, + db: DatabaseSync, + ): Promise; +} + +const vectorToBlob = (embedding: number[]): Buffer => + Buffer.from(new Float32Array(embedding).buffer); + +const VECTOR_TABLE = "chunks_vec"; + +export class SqliteVecStore implements VectorStore { + private db: DatabaseSync; + private enabled: boolean; + private extensionPath?: string; + private available: boolean | null = null; + private loadError?: string; + private dims?: number; + private ready: Promise | null = null; + private readonly VECTOR_LOAD_TIMEOUT_MS = 30_000; + + constructor(params: { db: DatabaseSync; enabled: boolean; extensionPath?: string }) { + this.db = params.db; + this.enabled = params.enabled; + this.extensionPath = params.extensionPath; + } + + async ensureReady(dimensions: number): Promise { + if (!this.enabled) return false; + if (!this.ready) { + this.ready = this.withTimeout( + this.loadVectorExtension(), + this.VECTOR_LOAD_TIMEOUT_MS, + `sqlite-vec load timed out after ${Math.round(this.VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, + ); + } + let ready = false; + try { + ready = await this.ready; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.available = false; + this.loadError = message; + this.ready = null; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + if (ready && typeof dimensions === "number" && dimensions > 0) { + this.ensureVectorTable(dimensions); + } + return ready; + } + + async upsert(id: string, embedding: number[]): Promise { + if (!this.available || !this.dims) { + throw new Error("Vector store not ready"); + } + try { + this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); + } catch {} + this.db + .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) + .run(id, vectorToBlob(embedding)); + } + + async delete(id: string): Promise { + if (!this.available) return; + try { + this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); + } catch {} + } + + async deleteByPath(path: string, source: string, db: DatabaseSync): Promise { + if (!this.available) return; + try { + db.prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ).run(path, source); + } catch {} + } + + async search( + queryVec: number[], + limit: number, + filter: { model: string; sources: string[] }, + db: DatabaseSync, + ): Promise { + if (!this.available || queryVec.length === 0 || limit <= 0) return []; + if (!(await this.ensureReady(queryVec.length))) return []; + + const sourceFilter = + filter.sources.length === 0 + ? { sql: "", params: [] } + : { + sql: ` AND c.source IN (${filter.sources.map(() => "?").join(", ")})`, + params: filter.sources, + }; + + const rows = db + .prepare( + `SELECT c.id,\n` + + ` vec_distance_cosine(v.embedding, ?) AS dist\n` + + ` FROM ${VECTOR_TABLE} v\n` + + ` JOIN chunks c ON c.id = v.id\n` + + ` WHERE c.model = ?${sourceFilter.sql}\n` + + ` ORDER BY dist ASC\n` + + ` LIMIT ?`, + ) + .all(vectorToBlob(queryVec), filter.model, ...sourceFilter.params, limit) as Array<{ + id: string; + dist: number; + }>; + + return rows.map((row) => ({ + id: row.id, + score: 1 - row.dist, + })); + } + + private async loadVectorExtension(): Promise { + if (this.available !== null) return this.available; + if (!this.enabled) { + this.available = false; + return false; + } + try { + const resolvedPath = this.extensionPath?.trim() + ? this.extensionPath.trim() + : undefined; + const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); + if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error"); + this.extensionPath = loaded.extensionPath; + this.available = true; + return true; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.available = false; + this.loadError = message; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + } + + private ensureVectorTable(dimensions: number): void { + if (this.dims === dimensions) return; + if (this.dims && this.dims !== dimensions) { + this.dropVectorTable(); + } + this.db.exec( + `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + + ` id TEXT PRIMARY KEY,\n` + + ` embedding FLOAT[${dimensions}]\n` + + `)`, + ); + this.dims = dimensions; + } + + private dropVectorTable(): void { + try { + this.db.exec(`DROP TABLE IF NOT EXISTS ${VECTOR_TABLE}`); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); + } + } + + private async withTimeout( + promise: Promise, + timeoutMs: number, + timeoutMessage: string, + ): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs), + ), + ]); + } +}