From c71eca52371b154a1254b69c222b1bd5a348dfde Mon Sep 17 00:00:00 2001 From: Hande <159312713+hande-k@users.noreply.github.com> Date: Thu, 29 Jan 2026 20:48:23 +0100 Subject: [PATCH] add update --- examples/cognee-docker-compose.yaml | 3 +- src/cli/memory-cli.ts | 14 ++- src/memory/cognee-client.ts | 91 ++++++++++++++++ src/memory/cognee-provider.ts | 157 +++++++++++++++++++++++++--- src/memory/manager.ts | 1 + 5 files changed, 246 insertions(+), 20 deletions(-) diff --git a/examples/cognee-docker-compose.yaml b/examples/cognee-docker-compose.yaml index 8269e14ec..d272378af 100644 --- a/examples/cognee-docker-compose.yaml +++ b/examples/cognee-docker-compose.yaml @@ -11,8 +11,6 @@ # - Vector DB: LanceDB (file-based) # - Graph DB: Kuzu (file-based) -version: "3.8" - services: cognee: image: cognee/cognee:latest @@ -21,6 +19,7 @@ services: - "127.0.0.1:8000:8000" environment: - LLM_API_KEY=${LLM_API_KEY} + - ENABLE_BACKEND_ACCESS_CONTROL=false volumes: - cognee_data:/app/cognee/.cognee_system restart: unless-stopped diff --git a/src/cli/memory-cli.ts b/src/cli/memory-cli.ts index 68894adf5..222304fef 100644 --- a/src/cli/memory-cli.ts +++ b/src/cli/memory-cli.ts @@ -24,6 +24,7 @@ type MemoryCommandOptions = { json?: boolean; deep?: boolean; index?: boolean; + updateCognee?: boolean; verbose?: boolean; }; @@ -197,6 +198,7 @@ async function scanMemorySources(params: { export async function runMemoryStatus(opts: MemoryCommandOptions) { setVerbose(Boolean(opts.verbose)); + const updateCognee = opts.updateCognee ?? process.argv.includes("--update-cognee"); const cfg = loadConfig(); const agentIds = resolveAgentIds(cfg, opts.agent); const allResults: Array<{ @@ -205,6 +207,7 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { embeddingProbe?: Awaited>; indexError?: string; scan?: MemorySourceScan; + cogneeUpdate?: boolean; }> = []; for (const agentId of agentIds) { @@ -240,6 +243,7 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { try { await manager.sync({ reason: "cli", + update: updateCognee, progress: (syncUpdate) => { update({ completed: syncUpdate.completed, @@ -269,7 +273,14 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { agentId, sources, }); - allResults.push({ agentId, status, embeddingProbe, indexError, scan }); + allResults.push({ + agentId, + status, + embeddingProbe, + indexError, + scan, + cogneeUpdate: Boolean(updateCognee), + }); }, }); } @@ -434,6 +445,7 @@ export function registerMemoryCli(program: Command) { .option("--json", "Print JSON") .option("--deep", "Probe embedding provider availability") .option("--index", "Reindex if dirty (implies --deep)") + .option("--update-cognee", "Use Cognee update when file data ids are known", false) .option("--verbose", "Verbose logging", false) .action(async (opts: MemoryCommandOptions) => { await runMemoryStatus(opts); diff --git a/src/memory/cognee-client.ts b/src/memory/cognee-client.ts index b092239ae..6ed294e26 100644 --- a/src/memory/cognee-client.ts +++ b/src/memory/cognee-client.ts @@ -24,6 +24,21 @@ export type CogneeAddResponse = { datasetId: string; datasetName: string; message: string; + dataId?: string; +}; + +export type CogneeUpdateRequest = { + dataId: string; + datasetId: string; + data: string; +}; + +export type CogneeUpdateResponse = { + datasetId?: string; + datasetName?: string; + message?: string; + status?: string; + dataId?: string; }; export type CogneeCognifyRequest = { @@ -119,12 +134,15 @@ export class CogneeClient { dataset_id: string; dataset_name: string; message: string; + data_id?: unknown; + data_ingestion_info?: unknown; }; return { datasetId: data.dataset_id, datasetName: data.dataset_name, message: data.message, + dataId: this.extractDataId(data.data_id ?? data.data_ingestion_info), }; } catch (error) { log.error("Failed to add data to Cognee", { error }); @@ -134,6 +152,63 @@ export class CogneeClient { } } + async update(req: CogneeUpdateRequest): Promise { + const url = new URL(`${this.baseUrl}${API_PREFIX}/update`); + url.searchParams.set("data_id", req.dataId); + url.searchParams.set("dataset_id", req.datasetId); + const headers: Record = {}; + if (this.apiKey) { + headers.Authorization = `Bearer ${this.apiKey}`; + headers["X-Api-Key"] = this.apiKey; + } + + log.debug("Updating data in Cognee", { + url: url.toString(), + dataLength: req.data.length, + }); + + try { + const formData = new FormData(); + const blob = new Blob([req.data], { type: "text/plain" }); + formData.append("data", blob, "clawdbot-memory.txt"); + + const response = await request(url.toString(), { + method: "PATCH", + headers, + body: formData, + bodyTimeout: this.timeoutMs, + headersTimeout: this.timeoutMs, + }); + + if (response.statusCode !== 200) { + const errorText = await response.body.text(); + throw new Error(`Cognee update failed with status ${response.statusCode}: ${errorText}`); + } + + const data = (await response.body.json()) as { + status?: string; + message?: string; + dataset_id?: string; + dataset_name?: string; + data_id?: unknown; + data_ingestion_info?: unknown; + }; + + return { + status: data.status, + message: data.message, + datasetId: data.dataset_id ?? req.datasetId, + datasetName: data.dataset_name, + dataId: this.extractDataId(data.data_id ?? data.data_ingestion_info) ?? req.dataId, + }; + } catch (error) { + log.error("Failed to update data in Cognee", { error }); + throw new Error( + `Cognee update request failed: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + async cognify(req: CogneeCognifyRequest = {}): Promise { const url = `${this.baseUrl}${API_PREFIX}/cognify`; const headers: Record = { @@ -265,6 +340,22 @@ export class CogneeClient { } } + private extractDataId(value: unknown): string | undefined { + if (!value) return undefined; + if (typeof value === "string") return value; + if (Array.isArray(value)) { + for (const entry of value) { + const id = this.extractDataId(entry); + if (id) return id; + } + return undefined; + } + if (typeof value !== "object") return undefined; + const record = value as { data_id?: unknown; data_ingestion_info?: unknown }; + if (typeof record.data_id === "string") return record.data_id; + return this.extractDataId(record.data_ingestion_info); + } + async healthCheck(): Promise { try { await this.status(); diff --git a/src/memory/cognee-provider.ts b/src/memory/cognee-provider.ts index 44ecc309b..2ec6015ab 100644 --- a/src/memory/cognee-provider.ts +++ b/src/memory/cognee-provider.ts @@ -1,7 +1,9 @@ import fs from "node:fs/promises"; +import os from "node:os"; import path from "node:path"; import type { MoltbotConfig } from "../config/config.js"; import { resolveAgentWorkspaceDir } from "../agents/agent-scope.js"; +import { resolveStateDir } from "../config/paths.js"; import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import type { MemorySearchResult } from "./index.js"; @@ -25,6 +27,12 @@ const DEFAULT_AUTO_COGNIFY = true; const DEFAULT_COGNIFY_BATCH_SIZE = 100; const SNIPPET_MAX_CHARS = 700; +type CogneeSyncIndex = { + datasetId?: string; + datasetName?: string; + files: Record; +}; + export type CogneeProviderConfig = { baseUrl?: string; apiKey?: string; @@ -51,6 +59,10 @@ export class CogneeMemoryProvider { private readonly sources: Set; private datasetId?: string; private syncedFiles = new Map(); // path -> hash + private readonly syncIndexPath: string; + private syncIndexLoaded = false; + private syncIndex: CogneeSyncIndex = { files: {} }; + private syncIndexDirty = false; constructor( cfg: MoltbotConfig, @@ -75,6 +87,12 @@ export class CogneeMemoryProvider { this.autoCognify = config.autoCognify ?? DEFAULT_AUTO_COGNIFY; this.cognifyBatchSize = config.cognifyBatchSize || DEFAULT_COGNIFY_BATCH_SIZE; this.sources = new Set(sources); + this.syncIndexPath = path.join( + resolveStateDir(process.env, os.homedir), + "memory", + "cognee", + `${agentId}.json`, + ); log.info("Cognee memory provider initialized", { agentId, @@ -91,30 +109,38 @@ export class CogneeMemoryProvider { async sync(params?: { reason?: string; force?: boolean; + update?: boolean; progress?: (update: { completed: number; total: number; label?: string }) => void; }): Promise { log.info("Starting Cognee memory sync", { agentId: this.agentId }); let addedCount = 0; + await this.loadSyncIndex(); + const force = Boolean(params?.force); + const update = Boolean(params?.update); // Sync memory files if (this.sources.has("memory")) { const memoryFiles = await this.collectMemoryFiles(); - addedCount += await this.syncFiles(memoryFiles, "memory"); + addedCount += await this.syncFiles(memoryFiles, "memory", { update }); } // Sync session transcripts if (this.sources.has("sessions")) { const sessionFiles = await this.collectSessionFiles(); - addedCount += await this.syncFiles(sessionFiles, "sessions"); + addedCount += await this.syncFiles(sessionFiles, "sessions", { update }); } // Run cognify if auto-enabled and files were added - if (this.autoCognify && addedCount > 0) { + if ((this.autoCognify && addedCount > 0) || (this.autoCognify && force)) { log.info("Running cognify after sync", { addedCount }); await this.cognify(); } + if (this.syncIndexDirty) { + await this.saveSyncIndex(); + } + log.info("Cognee memory sync completed", { agentId: this.agentId, addedCount, @@ -288,6 +314,60 @@ export class CogneeMemoryProvider { async close(): Promise {} + private async loadSyncIndex(): Promise { + if (this.syncIndexLoaded) return; + this.syncIndexLoaded = true; + try { + const raw = await fs.readFile(this.syncIndexPath, "utf-8"); + const parsed = JSON.parse(raw) as CogneeSyncIndex; + if (!parsed || typeof parsed !== "object") return; + this.syncIndex = { + datasetId: parsed.datasetId, + datasetName: parsed.datasetName, + files: parsed.files && typeof parsed.files === "object" ? parsed.files : {}, + }; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code !== "ENOENT") { + log.warn("Failed to load Cognee sync index", { error }); + } + } + + if (this.syncIndex.datasetName && this.syncIndex.datasetName !== this.datasetName) { + log.info("Resetting Cognee sync index (dataset name changed)", { + from: this.syncIndex.datasetName, + to: this.datasetName, + }); + this.syncIndex = { files: {} }; + this.syncIndexDirty = true; + } + + if (this.syncIndex.datasetId && this.datasetId && this.syncIndex.datasetId !== this.datasetId) { + log.info("Resetting Cognee sync index (dataset id changed)", { + from: this.syncIndex.datasetId, + to: this.datasetId, + }); + this.syncIndex = { files: {} }; + this.syncIndexDirty = true; + } + + if (!this.datasetId && this.syncIndex.datasetId) { + this.datasetId = this.syncIndex.datasetId; + } + } + + private async saveSyncIndex(): Promise { + const dir = path.dirname(this.syncIndexPath); + await fs.mkdir(dir, { recursive: true }); + const payload: CogneeSyncIndex = { + datasetId: this.datasetId ?? this.syncIndex.datasetId, + datasetName: this.datasetName, + files: this.syncIndex.files, + }; + await fs.writeFile(this.syncIndexPath, JSON.stringify(payload, null, 2), "utf-8"); + this.syncIndexDirty = false; + } + private async collectMemoryFiles(): Promise { const files: MemoryFileEntry[] = []; const memoryPaths = await listMemoryFiles(this.workspaceDir); @@ -337,9 +417,14 @@ export class CogneeMemoryProvider { return files; } - private async syncFiles(files: MemoryFileEntry[], source: CogneeMemorySource): Promise { + private async syncFiles( + files: MemoryFileEntry[], + source: CogneeMemorySource, + opts?: { update?: boolean }, + ): Promise { let addedCount = 0; const batchSize = this.cognifyBatchSize; + const update = Boolean(opts?.update); for (let i = 0; i < files.length; i += batchSize) { const batch = files.slice(i, i + batchSize); @@ -363,24 +448,62 @@ export class CogneeMemoryProvider { const dataWithMetadata = `# ${file.path}\n\n${content}\n\n---\nMetadata: ${JSON.stringify(metadata)}`; - const response = await this.client.add({ - data: dataWithMetadata, - datasetName: this.datasetName, - }); + const record = this.syncIndex.files[file.path]; + const datasetId = this.datasetId ?? this.syncIndex.datasetId; + const canUpdate = update && record?.dataId && datasetId; - if (!this.datasetId) { - this.datasetId = response.datasetId; + if (canUpdate && datasetId && record?.dataId) { + await this.client.update({ + dataId: record.dataId, + datasetId, + data: dataWithMetadata, + }); + addedCount++; + log.debug("Updated file in Cognee", { + path: file.path, + datasetId, + dataId: record.dataId, + }); + } else { + const response = await this.client.add({ + data: dataWithMetadata, + datasetName: this.datasetName, + datasetId, + }); + + if (!this.datasetId) { + this.datasetId = response.datasetId; + } + if (response.dataId) { + this.syncIndex.files[file.path] = { + hash: file.hash, + dataId: response.dataId, + }; + } else { + this.syncIndex.files[file.path] = { hash: file.hash }; + } + this.syncIndex.datasetId = this.datasetId ?? this.syncIndex.datasetId; + this.syncIndex.datasetName = this.datasetName; + this.syncIndexDirty = true; + + this.syncedFiles.set(file.path, file.hash); + addedCount++; + + log.debug("Added file to Cognee", { + path: file.path, + datasetId: response.datasetId, + }); + continue; } + const dataId = record?.dataId; + this.syncIndex.files[file.path] = { hash: file.hash, dataId }; + this.syncIndex.datasetId = datasetId ?? this.syncIndex.datasetId; + this.syncIndex.datasetName = this.datasetName; + this.syncIndexDirty = true; this.syncedFiles.set(file.path, file.hash); - addedCount++; - - log.debug("Added file to Cognee", { - path: file.path, - datasetId: response.datasetId, - }); } catch (error) { - log.error("Failed to add file to Cognee", { path: file.path, error }); + log.error("Failed to sync file to Cognee", { path: file.path, error }); } } } diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 5fbc3ceea..627c7720c 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -384,6 +384,7 @@ export class MemoryIndexManager { async sync(params?: { reason?: string; force?: boolean; + update?: boolean; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { if (this.syncing) return this.syncing;