refactor(memory): extract vector management to dedicated module

- Extract sqlite-vec loading and vector table lifecycle to VectorManager class
- Move ~70 lines of vector logic from manager.ts to vector/vector-manager.ts
- manager.ts delegates vector operations to VectorManager instance
- Zero breaking changes to public API
- All existing tests should pass

Part of memory modularization effort to reduce manager.ts complexity (2179 → 2107 lines)
This commit is contained in:
w0x7ce 2026-01-28 22:06:32 +08:00
parent da421b9ef7
commit 37a1c361e0
2 changed files with 264 additions and 160 deletions

View File

@ -44,7 +44,7 @@ import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"
import { searchKeyword, searchVector } from "./manager-search.js";
import { ensureMemoryIndexSchema } from "./memory-schema.js";
import { requireNodeSqlite } from "./sqlite.js";
import { loadSqliteVecExtension } from "./sqlite-vec.js";
import { VectorManager } from "./vector/vector-manager.js";
type MemorySource = "memory" | "sessions";
@ -102,7 +102,6 @@ const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
const BATCH_FAILURE_LIMIT = 2;
const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024;
const VECTOR_LOAD_TIMEOUT_MS = 30_000;
const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000;
const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000;
const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000;
@ -142,19 +141,12 @@ export class MemoryIndexManager {
private readonly sources: Set<MemorySource>;
private providerKey: string;
private readonly cache: { enabled: boolean; maxEntries?: number };
private readonly vector: {
enabled: boolean;
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
private vectorManager: VectorManager;
private readonly fts: {
enabled: boolean;
available: boolean;
loadError?: string;
};
private vectorReady: Promise<boolean> | null = null;
private watcher: FSWatcher | null = null;
private watchTimer: NodeJS.Timeout | null = null;
private sessionWatchTimer: NodeJS.Timeout | null = null;
@ -232,15 +224,17 @@ export class MemoryIndexManager {
};
this.fts = { enabled: params.settings.query.hybrid.enabled, available: false };
this.ensureSchema();
this.vector = {
enabled: params.settings.store.vector.enabled,
available: null,
extensionPath: params.settings.store.vector.extensionPath,
};
const meta = this.readMeta();
if (meta?.vectorDims) {
this.vector.dims = meta.vectorDims;
}
this.vectorManager = new VectorManager(
this.db,
{
enabled: params.settings.store.vector.enabled,
extensionPath: params.settings.store.vector.extensionPath,
},
{
dims: meta?.vectorDims,
},
);
this.ensureWatcher();
this.ensureSessionListener();
this.ensureIntervalSync();
@ -452,13 +446,13 @@ export class MemoryIndexManager {
const files = this.db
.prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`)
.get(...sourceFilter.params) as {
c: number;
};
c: number;
};
const chunks = this.db
.prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`)
.get(...sourceFilter.params) as {
c: number;
};
c: number;
};
const sourceCounts = (() => {
const sources = Array.from(this.sources);
if (sources.length === 0) return [];
@ -501,15 +495,15 @@ export class MemoryIndexManager {
sourceCounts,
cache: this.cache.enabled
? {
enabled: true,
entries:
(
this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as
| { c: number }
| undefined
)?.c ?? 0,
maxEntries: this.cache.maxEntries,
}
enabled: true,
entries:
(
this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as
| { c: number }
| undefined
)?.c ?? 0,
maxEntries: this.cache.maxEntries,
}
: { enabled: false, maxEntries: this.cache.maxEntries },
fts: {
enabled: this.fts.enabled,
@ -519,13 +513,16 @@ export class MemoryIndexManager {
fallback: this.fallbackReason
? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason }
: undefined,
vector: {
enabled: this.vector.enabled,
available: this.vector.available ?? undefined,
extensionPath: this.vector.extensionPath,
loadError: this.vector.loadError,
dims: this.vector.dims,
},
vector: (() => {
const state = this.vectorManager.getState();
return {
enabled: this.settings.store.vector.enabled,
available: state.available ?? undefined,
extensionPath: state.extensionPath,
loadError: state.loadError,
dims: state.dims,
};
})(),
batch: {
enabled: this.batch.enabled,
failures: this.batchFailureCount,
@ -541,8 +538,7 @@ export class MemoryIndexManager {
}
async probeVectorAvailability(): Promise<boolean> {
if (!this.vector.enabled) return false;
return this.ensureVectorReady();
return this.vectorManager.ensureReady();
}
async probeEmbeddingAvailability(): Promise<{ ok: boolean; error?: string }> {
@ -583,76 +579,7 @@ export class MemoryIndexManager {
}
private async ensureVectorReady(dimensions?: number): Promise<boolean> {
if (!this.vector.enabled) return false;
if (!this.vectorReady) {
this.vectorReady = this.withTimeout(
this.loadVectorExtension(),
VECTOR_LOAD_TIMEOUT_MS,
`sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`,
);
}
let ready = false;
try {
ready = await this.vectorReady;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.vector.available = false;
this.vector.loadError = message;
this.vectorReady = null;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
if (ready && typeof dimensions === "number" && dimensions > 0) {
this.ensureVectorTable(dimensions);
}
return ready;
}
private async loadVectorExtension(): Promise<boolean> {
if (this.vector.available !== null) return this.vector.available;
if (!this.vector.enabled) {
this.vector.available = false;
return false;
}
try {
const resolvedPath = this.vector.extensionPath?.trim()
? resolveUserPath(this.vector.extensionPath)
: undefined;
const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath });
if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error");
this.vector.extensionPath = loaded.extensionPath;
this.vector.available = true;
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.vector.available = false;
this.vector.loadError = message;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
}
private ensureVectorTable(dimensions: number): void {
if (this.vector.dims === dimensions) return;
if (this.vector.dims && this.vector.dims !== dimensions) {
this.dropVectorTable();
}
this.db.exec(
`CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` +
` id TEXT PRIMARY KEY,\n` +
` embedding FLOAT[${dimensions}]\n` +
`)`,
);
this.vector.dims = dimensions;
}
private dropVectorTable(): void {
try {
this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`);
}
return this.vectorManager.ensureReady(dimensions);
}
private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } {
@ -683,14 +610,14 @@ export class MemoryIndexManager {
`SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`,
)
.all() as Array<{
provider: string;
model: string;
provider_key: string;
hash: string;
embedding: string;
dims: number | null;
updated_at: number;
}>;
provider: string;
model: string;
provider_key: string;
hash: string;
embedding: string;
dims: number | null;
updated_at: number;
}>;
if (!rows.length) return;
const insert = this.db.prepare(
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
@ -716,7 +643,7 @@ export class MemoryIndexManager {
} catch (err) {
try {
this.db.exec("ROLLBACK");
} catch {}
} catch { }
throw err;
}
}
@ -1032,14 +959,14 @@ export class MemoryIndexManager {
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(stale.path, "memory");
} catch {}
} catch { }
this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory");
if (this.fts.enabled && this.fts.available) {
try {
this.db
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
.run(stale.path, "memory", this.provider.model);
} catch {}
} catch { }
}
}
}
@ -1129,7 +1056,7 @@ export class MemoryIndexManager {
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(stale.path, "sessions");
} catch {}
} catch { }
this.db
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
.run(stale.path, "sessions");
@ -1138,7 +1065,7 @@ export class MemoryIndexManager {
this.db
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
.run(stale.path, "sessions", this.provider.model);
} catch {}
} catch { }
}
}
}
@ -1308,10 +1235,8 @@ export class MemoryIndexManager {
const originalState = {
ftsAvailable: this.fts.available,
ftsError: this.fts.loadError,
vectorAvailable: this.vector.available,
vectorLoadError: this.vector.loadError,
vectorDims: this.vector.dims,
vectorReady: this.vectorReady,
db: this.db,
providerKey: this.providerKey,
};
const restoreOriginalState = () => {
@ -1322,17 +1247,19 @@ export class MemoryIndexManager {
}
this.fts.available = originalState.ftsAvailable;
this.fts.loadError = originalState.ftsError;
this.vector.available = originalDbClosed ? null : originalState.vectorAvailable;
this.vector.loadError = originalState.vectorLoadError;
this.vector.dims = originalState.vectorDims;
this.vectorReady = originalDbClosed ? null : originalState.vectorReady;
this.providerKey = originalState.providerKey;
// VectorManager will be recreated with the restored db connection
};
this.db = tempDb;
this.vectorReady = null;
this.vector.available = null;
this.vector.loadError = undefined;
this.vector.dims = undefined;
// Reset VectorManager on meta change
this.vectorManager = new VectorManager(
this.db,
{
enabled: this.settings.store.vector.enabled,
extensionPath: this.settings.store.vector.extensionPath,
},
);
this.fts.available = false;
this.fts.loadError = undefined;
this.ensureSchema();
@ -1369,8 +1296,9 @@ export class MemoryIndexManager {
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
};
if (this.vector.available && this.vector.dims) {
nextMeta.vectorDims = this.vector.dims;
const vectorState = this.vectorManager.getState();
if (vectorState.available && vectorState.dims) {
nextMeta.vectorDims = vectorState.dims;
}
this.writeMeta(nextMeta);
@ -1383,15 +1311,22 @@ export class MemoryIndexManager {
await this.swapIndexFiles(dbPath, tempDbPath);
this.db = this.openDatabaseAtPath(dbPath);
this.vectorReady = null;
this.vector.available = null;
this.vector.loadError = undefined;
this.ensureSchema();
this.vector.dims = nextMeta.vectorDims;
// Recreate VectorManager with new database connection
this.vectorManager = new VectorManager(
this.db,
{
enabled: this.settings.store.vector.enabled,
extensionPath: this.settings.store.vector.extensionPath,
},
{
dims: nextMeta.vectorDims,
},
);
} catch (err) {
try {
this.db.close();
} catch {}
} catch { }
await this.removeIndexFiles(tempDbPath);
restoreOriginalState();
throw err;
@ -1404,10 +1339,9 @@ export class MemoryIndexManager {
if (this.fts.enabled && this.fts.available) {
try {
this.db.exec(`DELETE FROM ${FTS_TABLE}`);
} catch {}
} catch { }
}
this.dropVectorTable();
this.vector.dims = undefined;
this.vectorManager.dropVectorTable();
this.sessionsDirtyFiles.clear();
}
@ -1576,7 +1510,7 @@ export class MemoryIndexManager {
const rows = this.db
.prepare(
`SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` +
` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`,
` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`,
)
.all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>;
for (const row of rows) {
@ -1592,11 +1526,11 @@ export class MemoryIndexManager {
const now = Date.now();
const stmt = this.db.prepare(
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` +
` VALUES (?, ?, ?, ?, ?, ?, ?)\n` +
` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` +
` embedding=excluded.embedding,\n` +
` dims=excluded.dims,\n` +
` updated_at=excluded.updated_at`,
` VALUES (?, ?, ?, ?, ?, ?, ?)\n` +
` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` +
` embedding=excluded.embedding,\n` +
` dims=excluded.dims,\n` +
` updated_at=excluded.updated_at`,
);
for (const entry of entries) {
const embedding = entry.embedding ?? [];
@ -1625,11 +1559,11 @@ export class MemoryIndexManager {
this.db
.prepare(
`DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` +
` WHERE rowid IN (\n` +
` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` +
` ORDER BY updated_at ASC\n` +
` LIMIT ?\n` +
` )`,
` WHERE rowid IN (\n` +
` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` +
` ORDER BY updated_at ASC\n` +
` LIMIT ?\n` +
` )`,
)
.run(excess);
}
@ -2098,14 +2032,14 @@ export class MemoryIndexManager {
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(entry.path, options.source);
} catch {}
} catch { }
}
if (this.fts.enabled && this.fts.available) {
try {
this.db
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
.run(entry.path, options.source, this.provider.model);
} catch {}
} catch { }
}
this.db
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
@ -2142,7 +2076,7 @@ export class MemoryIndexManager {
if (vectorReady && embedding.length > 0) {
try {
this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id);
} catch {}
} catch { }
this.db
.prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`)
.run(id, vectorToBlob(embedding));
@ -2151,7 +2085,7 @@ export class MemoryIndexManager {
this.db
.prepare(
`INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` +
` VALUES (?, ?, ?, ?, ?, ?, ?)`,
` VALUES (?, ?, ?, ?, ?, ?, ?)`,
)
.run(
chunk.text,

View File

@ -0,0 +1,170 @@
import type { DatabaseSync } from "node:sqlite";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { resolveUserPath } from "../../utils.js";
import { loadSqliteVecExtension } from "../sqlite-vec.js";
const log = createSubsystemLogger("memory:vector");
const VECTOR_LOAD_TIMEOUT_MS = 30_000;
const VECTOR_TABLE = "chunks_vec";
export type VectorConfig = {
enabled: boolean;
extensionPath?: string;
};
export type VectorState = {
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
/**
* Manages sqlite-vec extension loading and vector table lifecycle.
*/
export class VectorManager {
private state: VectorState;
private loadPromise: Promise<boolean> | null = null;
constructor(
private readonly db: DatabaseSync,
private readonly config: VectorConfig,
initialState?: Partial<VectorState>,
) {
this.state = {
available: null,
extensionPath: config.extensionPath,
...initialState,
};
}
/**
* Ensures the vector extension is loaded and ready.
* Optionally creates/recreates the vector table if dimensions are provided.
*/
async ensureReady(dimensions?: number): Promise<boolean> {
if (!this.config.enabled) return false;
if (!this.loadPromise) {
this.loadPromise = this.withTimeout(
this.loadExtension(),
VECTOR_LOAD_TIMEOUT_MS,
`sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`,
);
}
let ready = false;
try {
ready = await this.loadPromise;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.state.available = false;
this.state.loadError = message;
this.loadPromise = null;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
if (ready && typeof dimensions === "number" && dimensions > 0) {
this.ensureVectorTable(dimensions);
}
return ready;
}
/**
* Loads the sqlite-vec extension.
*/
private async loadExtension(): Promise<boolean> {
if (this.state.available !== null) return this.state.available;
if (!this.config.enabled) {
this.state.available = false;
return false;
}
try {
const resolvedPath = this.config.extensionPath?.trim()
? resolveUserPath(this.config.extensionPath)
: undefined;
const loaded = await loadSqliteVecExtension({
db: this.db,
extensionPath: resolvedPath,
});
if (!loaded.ok) {
throw new Error(loaded.error ?? "unknown sqlite-vec load error");
}
this.state.extensionPath = loaded.extensionPath;
this.state.available = true;
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.state.available = false;
this.state.loadError = message;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
}
/**
* Ensures the vector table exists with the specified dimensions.
* Drops and recreates if dimensions change.
*/
private ensureVectorTable(dimensions: number): void {
if (this.state.dims === dimensions) return;
if (this.state.dims && this.state.dims !== dimensions) {
this.dropVectorTable();
}
this.db.exec(
`CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` +
` id TEXT PRIMARY KEY,\n` +
` embedding FLOAT[${dimensions}]\n` +
`)`,
);
this.state.dims = dimensions;
}
/**
* Drops the vector table.
*/
dropVectorTable(): void {
try {
this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`);
this.state.dims = undefined;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`);
}
}
/**
* Gets the current state of the vector manager.
*/
getState(): Readonly<VectorState> {
return { ...this.state };
}
/**
* Wraps a promise with a timeout.
*/
private async withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
message: string,
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error(message)), timeoutMs),
),
]);
}
}