diff --git a/src/memory/manager.ts b/src/memory/manager.ts index a799a5e0f..ab01cb5e7 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -45,7 +45,7 @@ import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js" import { searchKeyword, searchVector } from "./manager-search.js"; import { ensureMemoryIndexSchema } from "./memory-schema.js"; import { requireNodeSqlite } from "./sqlite.js"; -import { loadSqliteVecExtension } from "./sqlite-vec.js"; +import { VectorManager } from "./vector/vector-manager.js"; type MemorySource = "memory" | "sessions"; @@ -103,7 +103,6 @@ const EMBEDDING_RETRY_BASE_DELAY_MS = 500; const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const BATCH_FAILURE_LIMIT = 2; const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024; -const VECTOR_LOAD_TIMEOUT_MS = 30_000; const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000; const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000; const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000; @@ -143,19 +142,12 @@ export class MemoryIndexManager { private readonly sources: Set; private providerKey: string; private readonly cache: { enabled: boolean; maxEntries?: number }; - private readonly vector: { - enabled: boolean; - available: boolean | null; - extensionPath?: string; - loadError?: string; - dims?: number; - }; + private vectorManager: VectorManager; private readonly fts: { enabled: boolean; available: boolean; loadError?: string; }; - private vectorReady: Promise | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; private sessionWatchTimer: NodeJS.Timeout | null = null; @@ -233,15 +225,17 @@ export class MemoryIndexManager { }; this.fts = { enabled: params.settings.query.hybrid.enabled, available: false }; this.ensureSchema(); - this.vector = { - enabled: params.settings.store.vector.enabled, - available: null, - extensionPath: params.settings.store.vector.extensionPath, - }; const meta = this.readMeta(); - if (meta?.vectorDims) { - this.vector.dims = meta.vectorDims; - } + this.vectorManager = new VectorManager( + this.db, + { + enabled: params.settings.store.vector.enabled, + extensionPath: params.settings.store.vector.extensionPath, + }, + { + dims: meta?.vectorDims, + }, + ); this.ensureWatcher(); this.ensureSessionListener(); this.ensureIntervalSync(); @@ -493,13 +487,13 @@ export class MemoryIndexManager { const files = this.db .prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { - c: number; - }; + c: number; + }; const chunks = this.db .prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`) .get(...sourceFilter.params) as { - c: number; - }; + c: number; + }; const sourceCounts = (() => { const sources = Array.from(this.sources); if (sources.length === 0) return []; @@ -543,15 +537,15 @@ export class MemoryIndexManager { sourceCounts, cache: this.cache.enabled ? { - enabled: true, - entries: - ( - this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as - | { c: number } - | undefined - )?.c ?? 0, - maxEntries: this.cache.maxEntries, - } + enabled: true, + entries: + ( + this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as + | { c: number } + | undefined + )?.c ?? 0, + maxEntries: this.cache.maxEntries, + } : { enabled: false, maxEntries: this.cache.maxEntries }, fts: { enabled: this.fts.enabled, @@ -561,13 +555,16 @@ export class MemoryIndexManager { fallback: this.fallbackReason ? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason } : undefined, - vector: { - enabled: this.vector.enabled, - available: this.vector.available ?? undefined, - extensionPath: this.vector.extensionPath, - loadError: this.vector.loadError, - dims: this.vector.dims, - }, + vector: (() => { + const state = this.vectorManager.getState(); + return { + enabled: this.settings.store.vector.enabled, + available: state.available ?? undefined, + extensionPath: state.extensionPath, + loadError: state.loadError, + dims: state.dims, + }; + })(), batch: { enabled: this.batch.enabled, failures: this.batchFailureCount, @@ -583,8 +580,7 @@ export class MemoryIndexManager { } async probeVectorAvailability(): Promise { - if (!this.vector.enabled) return false; - return this.ensureVectorReady(); + return this.vectorManager.ensureReady(); } async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> { @@ -625,76 +621,7 @@ export class MemoryIndexManager { } private async ensureVectorReady(dimensions?: number): Promise { - if (!this.vector.enabled) return false; - if (!this.vectorReady) { - this.vectorReady = this.withTimeout( - this.loadVectorExtension(), - VECTOR_LOAD_TIMEOUT_MS, - `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, - ); - } - let ready = false; - try { - ready = await this.vectorReady; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - this.vectorReady = null; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - if (ready && typeof dimensions === "number" && dimensions > 0) { - this.ensureVectorTable(dimensions); - } - return ready; - } - - private async loadVectorExtension(): Promise { - if (this.vector.available !== null) return this.vector.available; - if (!this.vector.enabled) { - this.vector.available = false; - return false; - } - try { - const resolvedPath = this.vector.extensionPath?.trim() - ? resolveUserPath(this.vector.extensionPath) - : undefined; - const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); - if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error"); - this.vector.extensionPath = loaded.extensionPath; - this.vector.available = true; - return true; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - } - - private ensureVectorTable(dimensions: number): void { - if (this.vector.dims === dimensions) return; - if (this.vector.dims && this.vector.dims !== dimensions) { - this.dropVectorTable(); - } - this.db.exec( - `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + - ` id TEXT PRIMARY KEY,\n` + - ` embedding FLOAT[${dimensions}]\n` + - `)`, - ); - this.vector.dims = dimensions; - } - - private dropVectorTable(): void { - try { - this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); - } + return this.vectorManager.ensureReady(dimensions); } private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { @@ -725,14 +652,14 @@ export class MemoryIndexManager { `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, ) .all() as Array<{ - provider: string; - model: string; - provider_key: string; - hash: string; - embedding: string; - dims: number | null; - updated_at: number; - }>; + provider: string; + model: string; + provider_key: string; + hash: string; + embedding: string; + dims: number | null; + updated_at: number; + }>; if (!rows.length) return; const insert = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) @@ -758,7 +685,7 @@ export class MemoryIndexManager { } catch (err) { try { this.db.exec("ROLLBACK"); - } catch {} + } catch { } throw err; } } @@ -1086,14 +1013,14 @@ export class MemoryIndexManager { `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "memory"); - } catch {} + } catch { } this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "memory", this.provider.model); - } catch {} + } catch { } } } } @@ -1183,7 +1110,7 @@ export class MemoryIndexManager { `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(stale.path, "sessions"); - } catch {} + } catch { } this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) .run(stale.path, "sessions"); @@ -1192,7 +1119,7 @@ export class MemoryIndexManager { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(stale.path, "sessions", this.provider.model); - } catch {} + } catch { } } } } @@ -1362,10 +1289,8 @@ export class MemoryIndexManager { const originalState = { ftsAvailable: this.fts.available, ftsError: this.fts.loadError, - vectorAvailable: this.vector.available, - vectorLoadError: this.vector.loadError, - vectorDims: this.vector.dims, - vectorReady: this.vectorReady, + db: this.db, + providerKey: this.providerKey, }; const restoreOriginalState = () => { @@ -1376,17 +1301,19 @@ export class MemoryIndexManager { } this.fts.available = originalState.ftsAvailable; this.fts.loadError = originalState.ftsError; - this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; - this.vector.loadError = originalState.vectorLoadError; - this.vector.dims = originalState.vectorDims; - this.vectorReady = originalDbClosed ? null : originalState.vectorReady; + this.providerKey = originalState.providerKey; + // VectorManager will be recreated with the restored db connection }; this.db = tempDb; - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.vector.dims = undefined; + // Reset VectorManager on meta change + this.vectorManager = new VectorManager( + this.db, + { + enabled: this.settings.store.vector.enabled, + extensionPath: this.settings.store.vector.extensionPath, + }, + ); this.fts.available = false; this.fts.loadError = undefined; this.ensureSchema(); @@ -1423,8 +1350,9 @@ export class MemoryIndexManager { chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, }; - if (this.vector.available && this.vector.dims) { - nextMeta.vectorDims = this.vector.dims; + const vectorState = this.vectorManager.getState(); + if (vectorState.available && vectorState.dims) { + nextMeta.vectorDims = vectorState.dims; } this.writeMeta(nextMeta); @@ -1437,15 +1365,22 @@ export class MemoryIndexManager { await this.swapIndexFiles(dbPath, tempDbPath); this.db = this.openDatabaseAtPath(dbPath); - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; this.ensureSchema(); - this.vector.dims = nextMeta.vectorDims; + // Recreate VectorManager with new database connection + this.vectorManager = new VectorManager( + this.db, + { + enabled: this.settings.store.vector.enabled, + extensionPath: this.settings.store.vector.extensionPath, + }, + { + dims: nextMeta.vectorDims, + }, + ); } catch (err) { try { this.db.close(); - } catch {} + } catch { } await this.removeIndexFiles(tempDbPath); restoreOriginalState(); throw err; @@ -1458,10 +1393,9 @@ export class MemoryIndexManager { if (this.fts.enabled && this.fts.available) { try { this.db.exec(`DELETE FROM ${FTS_TABLE}`); - } catch {} + } catch { } } - this.dropVectorTable(); - this.vector.dims = undefined; + this.vectorManager.dropVectorTable(); this.sessionsDirtyFiles.clear(); } @@ -1630,7 +1564,7 @@ export class MemoryIndexManager { const rows = this.db .prepare( `SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, + ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, ) .all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>; for (const row of rows) { @@ -1646,11 +1580,11 @@ export class MemoryIndexManager { const now = Date.now(); const stmt = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + - ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + - ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + - ` embedding=excluded.embedding,\n` + - ` dims=excluded.dims,\n` + - ` updated_at=excluded.updated_at`, + ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + + ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + + ` embedding=excluded.embedding,\n` + + ` dims=excluded.dims,\n` + + ` updated_at=excluded.updated_at`, ); for (const entry of entries) { const embedding = entry.embedding ?? []; @@ -1679,11 +1613,11 @@ export class MemoryIndexManager { this.db .prepare( `DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` WHERE rowid IN (\n` + - ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` ORDER BY updated_at ASC\n` + - ` LIMIT ?\n` + - ` )`, + ` WHERE rowid IN (\n` + + ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + + ` ORDER BY updated_at ASC\n` + + ` LIMIT ?\n` + + ` )`, ) .run(excess); } @@ -2152,14 +2086,14 @@ export class MemoryIndexManager { `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, ) .run(entry.path, options.source); - } catch {} + } catch { } } if (this.fts.enabled && this.fts.available) { try { this.db .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) .run(entry.path, options.source, this.provider.model); - } catch {} + } catch { } } this.db .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) @@ -2196,7 +2130,7 @@ export class MemoryIndexManager { if (vectorReady && embedding.length > 0) { try { this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); - } catch {} + } catch { } this.db .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) .run(id, vectorToBlob(embedding)); @@ -2205,7 +2139,7 @@ export class MemoryIndexManager { this.db .prepare( `INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` + - ` VALUES (?, ?, ?, ?, ?, ?, ?)`, + ` VALUES (?, ?, ?, ?, ?, ?, ?)`, ) .run( chunk.text, diff --git a/src/memory/vector/vector-manager.ts b/src/memory/vector/vector-manager.ts new file mode 100644 index 000000000..4b97c000d --- /dev/null +++ b/src/memory/vector/vector-manager.ts @@ -0,0 +1,170 @@ +import type { DatabaseSync } from "node:sqlite"; + +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { resolveUserPath } from "../../utils.js"; +import { loadSqliteVecExtension } from "../sqlite-vec.js"; + +const log = createSubsystemLogger("memory:vector"); + +const VECTOR_LOAD_TIMEOUT_MS = 30_000; +const VECTOR_TABLE = "chunks_vec"; + +export type VectorConfig = { + enabled: boolean; + extensionPath?: string; +}; + +export type VectorState = { + available: boolean | null; + extensionPath?: string; + loadError?: string; + dims?: number; +}; + +/** + * Manages sqlite-vec extension loading and vector table lifecycle. + */ +export class VectorManager { + private state: VectorState; + private loadPromise: Promise | null = null; + + constructor( + private readonly db: DatabaseSync, + private readonly config: VectorConfig, + initialState?: Partial, + ) { + this.state = { + available: null, + extensionPath: config.extensionPath, + ...initialState, + }; + } + + /** + * Ensures the vector extension is loaded and ready. + * Optionally creates/recreates the vector table if dimensions are provided. + */ + async ensureReady(dimensions?: number): Promise { + if (!this.config.enabled) return false; + + if (!this.loadPromise) { + this.loadPromise = this.withTimeout( + this.loadExtension(), + VECTOR_LOAD_TIMEOUT_MS, + `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, + ); + } + + let ready = false; + try { + ready = await this.loadPromise; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.state.available = false; + this.state.loadError = message; + this.loadPromise = null; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + + if (ready && typeof dimensions === "number" && dimensions > 0) { + this.ensureVectorTable(dimensions); + } + + return ready; + } + + /** + * Loads the sqlite-vec extension. + */ + private async loadExtension(): Promise { + if (this.state.available !== null) return this.state.available; + + if (!this.config.enabled) { + this.state.available = false; + return false; + } + + try { + const resolvedPath = this.config.extensionPath?.trim() + ? resolveUserPath(this.config.extensionPath) + : undefined; + + const loaded = await loadSqliteVecExtension({ + db: this.db, + extensionPath: resolvedPath, + }); + + if (!loaded.ok) { + throw new Error(loaded.error ?? "unknown sqlite-vec load error"); + } + + this.state.extensionPath = loaded.extensionPath; + this.state.available = true; + return true; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.state.available = false; + this.state.loadError = message; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + } + + /** + * Ensures the vector table exists with the specified dimensions. + * Drops and recreates if dimensions change. + */ + private ensureVectorTable(dimensions: number): void { + if (this.state.dims === dimensions) return; + + if (this.state.dims && this.state.dims !== dimensions) { + this.dropVectorTable(); + } + + this.db.exec( + `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + + ` id TEXT PRIMARY KEY,\n` + + ` embedding FLOAT[${dimensions}]\n` + + `)`, + ); + + this.state.dims = dimensions; + } + + /** + * Drops the vector table. + */ + dropVectorTable(): void { + try { + this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); + this.state.dims = undefined; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); + } + } + + /** + * Gets the current state of the vector manager. + */ + getState(): Readonly { + return { ...this.state }; + } + + /** + * Wraps a promise with a timeout. + */ + private async withTimeout( + promise: Promise, + timeoutMs: number, + message: string, + ): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(message)), timeoutMs), + ), + ]); + } +}