This commit is contained in:
Niranjan Akella 2026-01-30 11:53:22 +09:00 committed by GitHub
commit 4fbec72ad2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 753 additions and 159 deletions

View File

@ -88,7 +88,7 @@ Defaults:
3. `gemini` if a Gemini key can be resolved.
4. Otherwise memory search stays disabled until configured.
- Local mode uses node-llama-cpp and may require `pnpm approve-builds`.
- Uses sqlite-vec (when available) to accelerate vector search inside SQLite.
- Uses sqlite-vec (when available) or Qdrant to accelerate vector search.
Remote embeddings **require** an API key for the embedding provider. Moltbot
resolves keys from auth profiles, `models.providers.*.apiKey`, or environment
@ -377,6 +377,91 @@ Notes:
- `extensionPath` overrides the bundled sqlite-vec path (useful for custom builds
or non-standard install locations).
### Qdrant vector database
Clawdbot can use Qdrant as an alternative vector database backend, supporting both local (on-disk or in-memory) and cloud deployments.
**Local Qdrant server:**
```json5
agents: {
defaults: {
memorySearch: {
store: {
driver: "qdrant",
qdrant: {
url: "http://localhost:6333", // Default Qdrant REST API port
collection: {
onDisk: true, // Store vectors on disk (default: true)
distance: "Cosine" // Distance metric: "Cosine", "Euclidean", or "Dot"
}
}
}
}
}
}
```
**Cloud Qdrant:**
```json5
agents: {
defaults: {
memorySearch: {
store: {
driver: "qdrant",
qdrant: {
url: "https://your-cluster.qdrant.io",
apiKey: "your-api-key",
collection: {
onDisk: false, // Cloud handles persistence
distance: "Cosine"
}
}
}
}
}
}
```
**In-memory Qdrant (for testing):**
```json5
agents: {
defaults: {
memorySearch: {
store: {
driver: "qdrant",
qdrant: {
url: "http://localhost:6333",
collection: {
onDisk: false, // In-memory only
distance: "Cosine"
}
}
}
}
}
}
```
**Running Qdrant locally with Docker:**
```bash
docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 \
-v "$(pwd)/qdrant_storage:/qdrant/storage:z" \
qdrant/qdrant
```
Notes:
- Collection names are auto-generated per agent and model to ensure isolation.
- SQLite is still used for metadata (files, chunks tables) regardless of vector driver.
- Embedding cache uses SQLite regardless of vector driver.
- Hybrid search (BM25 + vector) works with Qdrant same as sqlite-vec.
- If Qdrant connection fails, Clawdbot falls back to in-process cosine similarity.
- Default driver is `"sqlite"` for backward compatibility.
### Local embedding auto-download
- Default local embedding model: `hf:ggml-org/embeddinggemma-300M-GGUF/embeddinggemma-300M-Q8_0.gguf` (~0.6 GB).

View File

@ -168,6 +168,7 @@
"@mariozechner/pi-coding-agent": "0.49.3",
"@mariozechner/pi-tui": "0.49.3",
"@mozilla/readability": "^0.6.0",
"@qdrant/qdrant-js": "^1.14.0",
"@sinclair/typebox": "0.34.47",
"@slack/bolt": "^4.6.0",
"@slack/web-api": "^7.13.0",

33
pnpm-lock.yaml generated
View File

@ -1487,6 +1487,23 @@ packages:
resolution: {integrity: sha512-juG5VWh4qAivzTAeMzvY9xs9HY5rAcr2E4I7tiSSCokRFi7XIZCAu92ZkSTsIj1OPceCifL3cpfteP3pDT9/QQ==}
engines: {node: '>=14.0.0'}
'@qdrant/js-client-grpc@1.14.0':
resolution: {integrity: sha512-A1464Ub25+jVvqOKddemTZMP377LyilGrSHoWVaTT9FtieXXr0trIfJLL1UWExgZ6hhopaURVqcc8uK3OAsP1w==}
engines: {node: '>=18.0.0', pnpm: '>=8'}
'@qdrant/js-client-rest@1.14.0':
resolution: {integrity: sha512-2sM2g17FSkN2sNCSeAfqxHRr+SPEVnUQLXBjVv/whm4YQ4JjZ53Jiy1iShk95G+xBf3hKBhJdj8itRnor03IYw==}
engines: {node: '>=18.0.0', pnpm: '>=8'}
'@qdrant/qdrant-js@1.14.0':
resolution: {integrity: sha512-odY6jC/yKwE5uoyFhIN4J9EEyS6aVsGtrSdMFD0DuqMGW5ZvuvGcDNw3HLkN71JT0D0gQEyHO7/sokuG/GxkwA==}
engines: {node: '>=18.0.0', pnpm: '>=8'}
peerDependencies:
typescript: '>=4.1'
dependencies:
'@qdrant/js-client-grpc': 1.14.0
'@qdrant/js-client-rest': 1.14.0
'@napi-rs/canvas-android-arm64@0.1.88':
resolution: {integrity: sha512-KEaClPnZuVxJ8smUWjV1wWFkByBO/D+vy4lN+Dm5DFH514oqwukxKGeck9xcKJhaWJGjfruGmYGiwRe//+/zQQ==}
engines: {node: '>= 10'}
@ -7112,6 +7129,12 @@ snapshots:
'@mozilla/readability@0.6.0': {}
'@qdrant/js-client-grpc@1.14.0': {}
'@qdrant/js-client-rest@1.14.0': {}
'@qdrant/qdrant-js@1.14.0': {}
'@napi-rs/canvas-android-arm64@0.1.88':
optional: true
@ -7917,6 +7940,15 @@ snapshots:
'@sinclair/typebox@0.34.47': {}
'@qdrant/js-client-grpc@1.14.0': {}
'@qdrant/js-client-rest@1.14.0': {}
'@qdrant/qdrant-js@1.14.0':
dependencies:
'@qdrant/js-client-grpc': 1.14.0
'@qdrant/js-client-rest': 1.14.0
'@slack/bolt@4.6.0(@types/express@5.0.6)':
dependencies:
'@slack/logger': 4.0.0
@ -9114,6 +9146,7 @@ snapshots:
'@mariozechner/pi-coding-agent': 0.49.3(ws@8.19.0)(zod@4.3.6)
'@mariozechner/pi-tui': 0.49.3
'@mozilla/readability': 0.6.0
'@qdrant/qdrant-js': 1.14.0
'@sinclair/typebox': 0.34.47
'@slack/bolt': 4.6.0(@types/express@5.0.6)
'@slack/web-api': 7.13.0

View File

@ -33,12 +33,21 @@ export type ResolvedMemorySearchConfig = {
modelCacheDir?: string;
};
store: {
driver: "sqlite";
driver: "sqlite" | "qdrant";
path: string;
vector: {
enabled: boolean;
extensionPath?: string;
};
qdrant?: {
url?: string;
apiKey?: string;
collection?: {
name?: string;
onDisk?: boolean;
distance?: "Cosine" | "Euclidean" | "Dot";
};
};
};
chunking: {
tokens: number;
@ -172,10 +181,12 @@ function mergeConfig(
extensionPath:
overrides?.store?.vector?.extensionPath ?? defaults?.store?.vector?.extensionPath,
};
const qdrant = overrides?.store?.qdrant ?? defaults?.store?.qdrant;
const store = {
driver: overrides?.store?.driver ?? defaults?.store?.driver ?? "sqlite",
path: resolveStorePath(agentId, overrides?.store?.path ?? defaults?.store?.path),
vector,
...(qdrant ? { qdrant } : {}),
};
const chunking = {
tokens: overrides?.chunking?.tokens ?? defaults?.chunking?.tokens ?? DEFAULT_CHUNK_TOKENS,

View File

@ -265,7 +265,7 @@ export type MemorySearchConfig = {
};
/** Index storage configuration. */
store?: {
driver?: "sqlite";
driver?: "sqlite" | "qdrant";
path?: string;
vector?: {
/** Enable sqlite-vec extension for vector search (default: true). */
@ -273,6 +273,20 @@ export type MemorySearchConfig = {
/** Optional override path to sqlite-vec extension (.dylib/.so/.dll). */
extensionPath?: string;
};
qdrant?: {
/** Qdrant server URL (default: "http://localhost:6333"). */
url?: string;
/** API key for cloud Qdrant instances. */
apiKey?: string;
collection?: {
/** Collection name (default: auto-generated per agent/model). */
name?: string;
/** Store vectors on disk (default: true for local, false for in-memory). */
onDisk?: boolean;
/** Distance metric (default: "Cosine"). */
distance?: "Cosine" | "Euclidean" | "Dot";
};
};
cache?: {
/** Enable embedding cache (default: true). */
enabled?: boolean;

View File

@ -343,7 +343,7 @@ export const MemorySearchSchema = z
.optional(),
store: z
.object({
driver: z.literal("sqlite").optional(),
driver: z.enum(["sqlite", "qdrant"]).optional(),
path: z.string().optional(),
vector: z
.object({
@ -352,6 +352,21 @@ export const MemorySearchSchema = z
})
.strict()
.optional(),
qdrant: z
.object({
url: z.string().url().optional(),
apiKey: z.string().optional(),
collection: z
.object({
name: z.string().optional(),
onDisk: z.boolean().optional(),
distance: z.enum(["Cosine", "Euclidean", "Dot"]).optional(),
})
.strict()
.optional(),
})
.strict()
.optional(),
})
.strict()
.optional(),

View File

@ -13,7 +13,7 @@ import type { MoltbotConfig } from "../config/config.js";
import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import { resolveUserPath } from "../utils.js";
import { resolveUserPath, truncateUtf16Safe } from "../utils.js";
import {
createEmbeddingProvider,
type EmbeddingProvider,
@ -42,10 +42,11 @@ import {
parseEmbedding,
} from "./internal.js";
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
import { searchKeyword, searchVector } from "./manager-search.js";
import { searchKeyword, listChunks } from "./manager-search.js";
import { ensureMemoryIndexSchema } from "./memory-schema.js";
import { requireNodeSqlite } from "./sqlite.js";
import { loadSqliteVecExtension } from "./sqlite-vec.js";
import { QdrantStore } from "./vector-store-qdrant.js";
import { SqliteVecStore, type VectorStore } from "./vector-store.js";
type MemorySource = "memory" | "sessions";
@ -91,7 +92,6 @@ type MemorySyncProgressState = {
const META_KEY = "memory_index_meta_v1";
const SNIPPET_MAX_CHARS = 700;
const VECTOR_TABLE = "chunks_vec";
const FTS_TABLE = "chunks_fts";
const EMBEDDING_CACHE_TABLE = "embedding_cache";
const SESSION_DIRTY_DEBOUNCE_MS = 5000;
@ -113,9 +113,6 @@ const log = createSubsystemLogger("memory");
const INDEX_CACHE = new Map<string, MemoryIndexManager>();
const vectorToBlob = (embedding: number[]): Buffer =>
Buffer.from(new Float32Array(embedding).buffer);
export class MemoryIndexManager {
private readonly cacheKey: string;
private readonly cfg: MoltbotConfig;
@ -143,19 +140,12 @@ export class MemoryIndexManager {
private readonly sources: Set<MemorySource>;
private providerKey: string;
private readonly cache: { enabled: boolean; maxEntries?: number };
private readonly vector: {
enabled: boolean;
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
private readonly vectorStore: VectorStore;
private readonly fts: {
enabled: boolean;
available: boolean;
loadError?: string;
};
private vectorReady: Promise<boolean> | null = null;
private watcher: FSWatcher | null = null;
private watchTimer: NodeJS.Timeout | null = null;
private sessionWatchTimer: NodeJS.Timeout | null = null;
@ -233,14 +223,19 @@ export class MemoryIndexManager {
};
this.fts = { enabled: params.settings.query.hybrid.enabled, available: false };
this.ensureSchema();
this.vector = {
enabled: params.settings.store.vector.enabled,
available: null,
extensionPath: params.settings.store.vector.extensionPath,
};
const meta = this.readMeta();
if (meta?.vectorDims) {
this.vector.dims = meta.vectorDims;
// Initialize vector store after providerKey is computed
if (params.settings.store.driver === "qdrant") {
this.vectorStore = new QdrantStore({
agentId: this.agentId,
providerKey: this.providerKey,
config: params.settings.store.qdrant ?? {},
});
} else {
this.vectorStore = new SqliteVecStore({
db: this.db,
enabled: params.settings.store.vector.enabled,
extensionPath: params.settings.store.vector.extensionPath,
});
}
this.ensureWatcher();
this.ensureSessionListener();
@ -311,18 +306,87 @@ export class MemoryIndexManager {
queryVec: number[],
limit: number,
): Promise<Array<MemorySearchResult & { id: string }>> {
const results = await searchVector({
db: this.db,
vectorTable: VECTOR_TABLE,
providerModel: this.provider.model,
const sources = Array.from(this.sources);
const vectorResults = await this.vectorStore.search(
queryVec,
limit,
snippetMaxChars: SNIPPET_MAX_CHARS,
ensureVectorReady: async (dimensions) => await this.ensureVectorReady(dimensions),
sourceFilterVec: this.buildSourceFilter("c"),
sourceFilterChunks: this.buildSourceFilter(),
{ model: this.provider.model, sources },
this.db,
);
if (vectorResults.length === 0) {
// Fallback to in-process cosine similarity
return this.searchVectorFallback(queryVec, limit);
}
// Fetch chunk details from SQLite
const ids = vectorResults.map((r) => r.id);
const placeholders = ids.map(() => "?").join(", ");
const rows = this.db
.prepare(
`SELECT id, path, start_line, end_line, text, source\n` +
` FROM chunks\n` +
` WHERE id IN (${placeholders})`,
)
.all(...ids) as Array<{
id: string;
path: string;
start_line: number;
end_line: number;
text: string;
source: MemorySource;
}>;
const byId = new Map(rows.map((r) => [r.id, r]));
return vectorResults
.map((result) => {
const chunk = byId.get(result.id);
if (!chunk) return null;
return {
id: result.id,
path: chunk.path,
startLine: chunk.start_line,
endLine: chunk.end_line,
score: result.score,
snippet: truncateUtf16Safe(chunk.text, SNIPPET_MAX_CHARS),
source: chunk.source,
} as MemorySearchResult & { id: string };
})
.filter((r): r is MemorySearchResult & { id: string } => r !== null);
}
private searchVectorFallback(
queryVec: number[],
limit: number,
): Array<MemorySearchResult & { id: string }> {
// Fallback to in-process cosine similarity when vector store unavailable
const { listChunks } = await import("./manager-search.js");
const { cosineSimilarity } = await import("./internal.js");
const sources = Array.from(this.sources);
const sourceFilter = this.buildSourceFilter();
const candidates = listChunks({
db: this.db,
providerModel: this.provider.model,
sourceFilter,
});
return results.map((entry) => entry as MemorySearchResult & { id: string });
const scored = candidates
.map((chunk) => ({
chunk,
score: cosineSimilarity(queryVec, chunk.embedding),
}))
.filter((entry) => Number.isFinite(entry.score));
return scored
.sort((a, b) => b.score - a.score)
.slice(0, limit)
.map((entry) => ({
id: entry.chunk.id,
path: entry.chunk.path,
startLine: entry.chunk.startLine,
endLine: entry.chunk.endLine,
score: entry.score,
snippet: truncateUtf16Safe(entry.chunk.text, SNIPPET_MAX_CHARS),
source: entry.chunk.source,
}));
}
private buildFtsQuery(raw: string): string | null {
@ -562,11 +626,8 @@ export class MemoryIndexManager {
? { from: this.fallbackFrom ?? "local", reason: this.fallbackReason }
: undefined,
vector: {
enabled: this.vector.enabled,
available: this.vector.available ?? undefined,
extensionPath: this.vector.extensionPath,
loadError: this.vector.loadError,
dims: this.vector.dims,
driver: this.settings.store.driver,
available: await this.probeVectorAvailability(),
},
batch: {
enabled: this.batch.enabled,
@ -583,7 +644,6 @@ export class MemoryIndexManager {
}
async probeVectorAvailability(): Promise<boolean> {
if (!this.vector.enabled) return false;
return this.ensureVectorReady();
}
@ -624,78 +684,6 @@ export class MemoryIndexManager {
INDEX_CACHE.delete(this.cacheKey);
}
private async ensureVectorReady(dimensions?: number): Promise<boolean> {
if (!this.vector.enabled) return false;
if (!this.vectorReady) {
this.vectorReady = this.withTimeout(
this.loadVectorExtension(),
VECTOR_LOAD_TIMEOUT_MS,
`sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`,
);
}
let ready = false;
try {
ready = await this.vectorReady;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.vector.available = false;
this.vector.loadError = message;
this.vectorReady = null;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
if (ready && typeof dimensions === "number" && dimensions > 0) {
this.ensureVectorTable(dimensions);
}
return ready;
}
private async loadVectorExtension(): Promise<boolean> {
if (this.vector.available !== null) return this.vector.available;
if (!this.vector.enabled) {
this.vector.available = false;
return false;
}
try {
const resolvedPath = this.vector.extensionPath?.trim()
? resolveUserPath(this.vector.extensionPath)
: undefined;
const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath });
if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error");
this.vector.extensionPath = loaded.extensionPath;
this.vector.available = true;
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.vector.available = false;
this.vector.loadError = message;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
}
private ensureVectorTable(dimensions: number): void {
if (this.vector.dims === dimensions) return;
if (this.vector.dims && this.vector.dims !== dimensions) {
this.dropVectorTable();
}
this.db.exec(
`CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` +
` id TEXT PRIMARY KEY,\n` +
` embedding FLOAT[${dimensions}]\n` +
`)`,
);
this.vector.dims = dimensions;
}
private dropVectorTable(): void {
try {
this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`);
}
}
private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } {
const sources = Array.from(this.sources);
@ -714,7 +702,8 @@ export class MemoryIndexManager {
const dir = path.dirname(dbPath);
ensureDir(dir);
const { DatabaseSync } = requireNodeSqlite();
return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled });
const allowExtension = this.settings.store.driver === "sqlite" && this.settings.store.vector.enabled;
return new DatabaseSync(dbPath, { allowExtension });
}
private seedEmbeddingCache(sourceDb: DatabaseSync): void {
@ -1081,11 +1070,7 @@ export class MemoryIndexManager {
if (activePaths.has(stale.path)) continue;
this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory");
try {
this.db
.prepare(
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(stale.path, "memory");
await this.vectorStore.deleteByPath(stale.path, "memory", this.db);
} catch {}
this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory");
if (this.fts.enabled && this.fts.available) {
@ -1178,11 +1163,7 @@ export class MemoryIndexManager {
.prepare(`DELETE FROM files WHERE path = ? AND source = ?`)
.run(stale.path, "sessions");
try {
this.db
.prepare(
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(stale.path, "sessions");
await this.vectorStore.deleteByPath(stale.path, "sessions", this.db);
} catch {}
this.db
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
@ -1233,7 +1214,6 @@ export class MemoryIndexManager {
label: "Loading vector extension…",
});
}
const vectorReady = await this.ensureVectorReady();
const meta = this.readMeta();
const needsFullReindex =
params?.force ||
@ -1242,8 +1222,7 @@ export class MemoryIndexManager {
meta.provider !== this.provider.id ||
meta.providerKey !== this.providerKey ||
meta.chunkTokens !== this.settings.chunking.tokens ||
meta.chunkOverlap !== this.settings.chunking.overlap ||
(vectorReady && !meta?.vectorDims);
meta.chunkOverlap !== this.settings.chunking.overlap;
try {
if (needsFullReindex) {
await this.runSafeReindex({
@ -1362,10 +1341,6 @@ export class MemoryIndexManager {
const originalState = {
ftsAvailable: this.fts.available,
ftsError: this.fts.loadError,
vectorAvailable: this.vector.available,
vectorLoadError: this.vector.loadError,
vectorDims: this.vector.dims,
vectorReady: this.vectorReady,
};
const restoreOriginalState = () => {
@ -1376,17 +1351,9 @@ export class MemoryIndexManager {
}
this.fts.available = originalState.ftsAvailable;
this.fts.loadError = originalState.ftsError;
this.vector.available = originalDbClosed ? null : originalState.vectorAvailable;
this.vector.loadError = originalState.vectorLoadError;
this.vector.dims = originalState.vectorDims;
this.vectorReady = originalDbClosed ? null : originalState.vectorReady;
};
this.db = tempDb;
this.vectorReady = null;
this.vector.available = null;
this.vector.loadError = undefined;
this.vector.dims = undefined;
this.fts.available = false;
this.fts.loadError = undefined;
this.ensureSchema();
@ -1423,9 +1390,7 @@ export class MemoryIndexManager {
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
};
if (this.vector.available && this.vector.dims) {
nextMeta.vectorDims = this.vector.dims;
}
// Vector dimensions are managed by the vector store
this.writeMeta(nextMeta);
this.pruneEmbeddingCacheIfNeeded();
@ -1437,11 +1402,7 @@ export class MemoryIndexManager {
await this.swapIndexFiles(dbPath, tempDbPath);
this.db = this.openDatabaseAtPath(dbPath);
this.vectorReady = null;
this.vector.available = null;
this.vector.loadError = undefined;
this.ensureSchema();
this.vector.dims = nextMeta.vectorDims;
} catch (err) {
try {
this.db.close();
@ -1460,8 +1421,7 @@ export class MemoryIndexManager {
this.db.exec(`DELETE FROM ${FTS_TABLE}`);
} catch {}
}
this.dropVectorTable();
this.vector.dims = undefined;
// Vector store cleanup is handled by the store implementation
this.sessionsDirtyFiles.clear();
}
@ -2147,11 +2107,7 @@ export class MemoryIndexManager {
const now = Date.now();
if (vectorReady) {
try {
this.db
.prepare(
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
)
.run(entry.path, options.source);
await this.vectorStore.deleteByPath(entry.path, options.source, this.db);
} catch {}
}
if (this.fts.enabled && this.fts.available) {
@ -2195,11 +2151,10 @@ export class MemoryIndexManager {
);
if (vectorReady && embedding.length > 0) {
try {
this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id);
} catch {}
this.db
.prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`)
.run(id, vectorToBlob(embedding));
await this.vectorStore.upsert(id, embedding);
} catch (err) {
log.debug(`Vector upsert failed for ${id}: ${String(err)}`);
}
}
if (this.fts.enabled && this.fts.available) {
this.db

View File

@ -0,0 +1,282 @@
import type { DatabaseSync } from "node:sqlite";
import { QdrantClient } from "@qdrant/qdrant-js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { hashText } from "./internal.js";
import type { VectorSearchResult, VectorStore } from "./vector-store.js";
const log = createSubsystemLogger("memory");
export type QdrantConfig = {
url?: string;
apiKey?: string;
collection?: {
name?: string;
onDisk?: boolean;
distance?: "Cosine" | "Euclidean" | "Dot";
};
};
export class QdrantStore implements VectorStore {
private client: QdrantClient | null = null;
private collectionName: string;
private config: Required<QdrantConfig>;
private available: boolean | null = null;
private loadError?: string;
private dims?: number;
private ready: Promise<boolean> | null = null;
private readonly VECTOR_LOAD_TIMEOUT_MS = 30_000;
constructor(
params: {
agentId: string;
providerKey: string;
config: QdrantConfig;
},
) {
this.config = {
url: params.config.url ?? "http://localhost:6333",
apiKey: params.config.apiKey,
collection: {
name:
params.config.collection?.name ??
`clawdbot_${params.agentId}_${hashText(params.providerKey).slice(0, 16)}`,
onDisk: params.config.collection?.onDisk ?? true,
distance: params.config.collection?.distance ?? "Cosine",
},
};
this.collectionName = this.config.collection.name;
}
async ensureReady(dimensions: number): Promise<boolean> {
if (!this.ready) {
this.ready = this.withTimeout(
this.initialize(),
this.VECTOR_LOAD_TIMEOUT_MS,
`Qdrant initialization timed out after ${Math.round(this.VECTOR_LOAD_TIMEOUT_MS / 1000)}s`,
);
}
let ready = false;
try {
ready = await this.ready;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.available = false;
this.loadError = message;
this.ready = null;
log.warn(`Qdrant unavailable: ${message}`);
return false;
}
if (ready && typeof dimensions === "number" && dimensions > 0) {
await this.ensureCollection(dimensions);
}
return ready;
}
async upsert(id: string, embedding: number[]): Promise<void> {
if (!this.client || !this.available || !this.dims) {
throw new Error("Qdrant store not ready");
}
try {
await this.client.upsertPoints(this.collectionName, {
points: [
{
id: this.stringToPointId(id),
vector: embedding,
},
],
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.warn(`Qdrant upsert failed: ${message}`);
throw err;
}
}
async delete(id: string): Promise<void> {
if (!this.client || !this.available) return;
try {
await this.client.deletePoints(this.collectionName, {
points: [this.stringToPointId(id)],
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Qdrant delete failed: ${message}`);
}
}
async deleteByPath(path: string, source: string, db: DatabaseSync): Promise<void> {
if (!this.client || !this.available) return;
try {
// Get all chunk IDs for this path/source from SQLite
const rows = db
.prepare(`SELECT id FROM chunks WHERE path = ? AND source = ?`)
.all(path, source) as Array<{ id: string }>;
if (rows.length === 0) return;
const pointIds = rows.map((row) => this.stringToPointId(row.id));
await this.client.deletePoints(this.collectionName, {
points: pointIds,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Qdrant deleteByPath failed: ${message}`);
}
}
async search(
queryVec: number[],
limit: number,
filter: { model: string; sources: string[] },
db: DatabaseSync,
): Promise<VectorSearchResult[]> {
if (!this.client || !this.available || queryVec.length === 0 || limit <= 0) return [];
if (!(await this.ensureReady(queryVec.length))) return [];
try {
// Build filter to match model and sources
// We need to get chunk IDs from SQLite that match the filter, then search Qdrant
// This is a limitation: Qdrant doesn't store model/source metadata, so we filter in SQLite first
const sourceFilter =
filter.sources.length === 0
? { sql: "", params: [] }
: {
sql: ` AND source IN (${filter.sources.map(() => "?").join(", ")})`,
params: filter.sources,
};
const chunkRows = db
.prepare(
`SELECT id FROM chunks WHERE model = ?${sourceFilter.sql}`,
)
.all(filter.model, ...sourceFilter.params) as Array<{ id: string }>;
if (chunkRows.length === 0) return [];
const pointIds = chunkRows.map((row) => this.stringToPointId(row.id));
const results = await this.client.searchPoints(this.collectionName, {
vector: queryVec,
limit,
filter: {
must: [
{
has_id: pointIds,
},
],
},
});
return results.points.map((point) => {
const id = this.pointIdToString(point.id);
// Convert distance to similarity score
// Qdrant returns distance (lower is better for Cosine/Euclidean)
// For Cosine: similarity = 1 - distance (when distance is normalized 0-1)
// For Dot: score is already similarity
let score = 0;
if (point.score !== undefined) {
if (this.config.collection.distance === "Cosine") {
score = 1 - point.score;
} else if (this.config.collection.distance === "Dot") {
score = point.score;
} else {
// Euclidean: convert to similarity using 1 / (1 + distance)
score = 1 / (1 + point.score);
}
}
return { id, score };
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.warn(`Qdrant search failed: ${message}`);
return [];
}
}
private async initialize(): Promise<boolean> {
if (this.available !== null) return this.available;
try {
const clientConfig: { url: string; apiKey?: string } = {
url: this.config.url,
};
if (this.config.apiKey) {
clientConfig.apiKey = this.config.apiKey;
}
this.client = new QdrantClient(clientConfig);
// Test connection
await this.client.getCollections();
this.available = true;
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.available = false;
this.loadError = message;
log.warn(`Qdrant connection failed: ${message}`);
return false;
}
}
private async ensureCollection(dimensions: number): Promise<void> {
if (!this.client || this.dims === dimensions) return;
if (this.dims && this.dims !== dimensions) {
await this.dropCollection();
}
try {
const collections = await this.client.getCollections();
const exists = collections.collections.some((c) => c.name === this.collectionName);
if (!exists) {
await this.client.createCollection(this.collectionName, {
vectors: {
size: dimensions,
distance: this.config.collection.distance,
on_disk: this.config.collection.onDisk,
},
});
log.debug(`Created Qdrant collection: ${this.collectionName}`);
}
this.dims = dimensions;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.warn(`Failed to ensure Qdrant collection: ${message}`);
throw err;
}
}
private async dropCollection(): Promise<void> {
if (!this.client) return;
try {
await this.client.deleteCollection(this.collectionName);
log.debug(`Dropped Qdrant collection: ${this.collectionName}`);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Failed to drop Qdrant collection: ${message}`);
}
}
private stringToPointId(str: string): string | number {
// Qdrant supports string or integer IDs
// Use string IDs to match our chunk IDs
return str;
}
private pointIdToString(id: string | number): string {
return String(id);
}
private async withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string,
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs),
),
]);
}
}

198
src/memory/vector-store.ts Normal file
View File

@ -0,0 +1,198 @@
import type { DatabaseSync } from "node:sqlite";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { loadSqliteVecExtension } from "./sqlite-vec.js";
const log = createSubsystemLogger("memory");
export type VectorSearchResult = {
id: string;
score: number;
};
export interface VectorStore {
ensureReady(dimensions: number): Promise<boolean>;
upsert(id: string, embedding: number[]): Promise<void>;
delete(id: string): Promise<void>;
deleteByPath(path: string, source: string, db: DatabaseSync): Promise<void>;
search(
queryVec: number[],
limit: number,
filter: { model: string; sources: string[] },
db: DatabaseSync,
): Promise<VectorSearchResult[]>;
}
const vectorToBlob = (embedding: number[]): Buffer =>
Buffer.from(new Float32Array(embedding).buffer);
const VECTOR_TABLE = "chunks_vec";
export class SqliteVecStore implements VectorStore {
private db: DatabaseSync;
private enabled: boolean;
private extensionPath?: string;
private available: boolean | null = null;
private loadError?: string;
private dims?: number;
private ready: Promise<boolean> | null = null;
private readonly VECTOR_LOAD_TIMEOUT_MS = 30_000;
constructor(params: { db: DatabaseSync; enabled: boolean; extensionPath?: string }) {
this.db = params.db;
this.enabled = params.enabled;
this.extensionPath = params.extensionPath;
}
async ensureReady(dimensions: number): Promise<boolean> {
if (!this.enabled) return false;
if (!this.ready) {
this.ready = this.withTimeout(
this.loadVectorExtension(),
this.VECTOR_LOAD_TIMEOUT_MS,
`sqlite-vec load timed out after ${Math.round(this.VECTOR_LOAD_TIMEOUT_MS / 1000)}s`,
);
}
let ready = false;
try {
ready = await this.ready;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.available = false;
this.loadError = message;
this.ready = null;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
if (ready && typeof dimensions === "number" && dimensions > 0) {
this.ensureVectorTable(dimensions);
}
return ready;
}
async upsert(id: string, embedding: number[]): Promise<void> {
if (!this.available || !this.dims) {
throw new Error("Vector store not ready");
}
try {
this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id);
} catch {}
this.db
.prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`)
.run(id, vectorToBlob(embedding));
}
async delete(id: string): Promise<void> {
if (!this.available) return;
try {
this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id);
} catch {}
}
async deleteByPath(path: string, source: string, db: DatabaseSync): Promise<void> {
if (!this.available) return;
try {
db.prepare(
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
).run(path, source);
} catch {}
}
async search(
queryVec: number[],
limit: number,
filter: { model: string; sources: string[] },
db: DatabaseSync,
): Promise<VectorSearchResult[]> {
if (!this.available || queryVec.length === 0 || limit <= 0) return [];
if (!(await this.ensureReady(queryVec.length))) return [];
const sourceFilter =
filter.sources.length === 0
? { sql: "", params: [] }
: {
sql: ` AND c.source IN (${filter.sources.map(() => "?").join(", ")})`,
params: filter.sources,
};
const rows = db
.prepare(
`SELECT c.id,\n` +
` vec_distance_cosine(v.embedding, ?) AS dist\n` +
` FROM ${VECTOR_TABLE} v\n` +
` JOIN chunks c ON c.id = v.id\n` +
` WHERE c.model = ?${sourceFilter.sql}\n` +
` ORDER BY dist ASC\n` +
` LIMIT ?`,
)
.all(vectorToBlob(queryVec), filter.model, ...sourceFilter.params, limit) as Array<{
id: string;
dist: number;
}>;
return rows.map((row) => ({
id: row.id,
score: 1 - row.dist,
}));
}
private async loadVectorExtension(): Promise<boolean> {
if (this.available !== null) return this.available;
if (!this.enabled) {
this.available = false;
return false;
}
try {
const resolvedPath = this.extensionPath?.trim()
? this.extensionPath.trim()
: undefined;
const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath });
if (!loaded.ok) throw new Error(loaded.error ?? "unknown sqlite-vec load error");
this.extensionPath = loaded.extensionPath;
this.available = true;
return true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.available = false;
this.loadError = message;
log.warn(`sqlite-vec unavailable: ${message}`);
return false;
}
}
private ensureVectorTable(dimensions: number): void {
if (this.dims === dimensions) return;
if (this.dims && this.dims !== dimensions) {
this.dropVectorTable();
}
this.db.exec(
`CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` +
` id TEXT PRIMARY KEY,\n` +
` embedding FLOAT[${dimensions}]\n` +
`)`,
);
this.dims = dimensions;
}
private dropVectorTable(): void {
try {
this.db.exec(`DROP TABLE IF NOT EXISTS ${VECTOR_TABLE}`);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`);
}
}
private async withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
timeoutMessage: string,
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs),
),
]);
}
}