Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Consolidate RAG knowledge into knowledge manager #3347

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 1 addition & 326 deletions packages/adapter-sqlite/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import type {
Memory,
Relationship,
UUID,
RAGKnowledgeItem,
KnowledgeItem,
ChunkRow,
Adapter,
IAgentRuntime,
Expand Down Expand Up @@ -766,331 +766,6 @@ export class SqliteDatabaseAdapter
return false;
}
}

async getKnowledge(params: {
id?: UUID;
agentId: UUID;
limit?: number;
query?: string;
}): Promise<RAGKnowledgeItem[]> {
let sql = `SELECT * FROM knowledge WHERE (agentId = ? OR isShared = 1)`;
const queryParams: any[] = [params.agentId];

if (params.id) {
sql += ` AND id = ?`;
queryParams.push(params.id);
}

if (params.limit) {
sql += ` LIMIT ?`;
queryParams.push(params.limit);
}

interface KnowledgeRow {
id: UUID;
agentId: UUID;
content: string;
embedding: Buffer | null;
createdAt: string | number;
}

const rows = this.db.prepare(sql).all(...queryParams) as KnowledgeRow[];

return rows.map((row) => ({
id: row.id,
agentId: row.agentId,
content: JSON.parse(row.content),
embedding: row.embedding
? new Float32Array(row.embedding)
: undefined,
createdAt:
typeof row.createdAt === "string"
? Date.parse(row.createdAt)
: row.createdAt,
}));
}

async searchKnowledge(params: {
agentId: UUID;
embedding: Float32Array;
match_threshold: number;
match_count: number;
searchText?: string;
}): Promise<RAGKnowledgeItem[]> {
const cacheKey = `embedding_${params.agentId}_${params.searchText}`;
const cachedResult = await this.getCache({
key: cacheKey,
agentId: params.agentId,
});

if (cachedResult) {
return JSON.parse(cachedResult);
}

interface KnowledgeSearchRow {
id: UUID;
agentId: UUID;
content: string;
embedding: Buffer | null;
createdAt: string | number;
vector_score: number;
keyword_score: number;
combined_score: number;
}

const sql = `
WITH vector_scores AS (
SELECT id,
1 / (1 + vec_distance_L2(embedding, ?)) as vector_score
FROM knowledge
WHERE (agentId IS NULL AND isShared = 1) OR agentId = ?
AND embedding IS NOT NULL
),
keyword_matches AS (
SELECT id,
CASE
WHEN lower(json_extract(content, '$.text')) LIKE ? THEN 3.0
ELSE 1.0
END *
CASE
WHEN json_extract(content, '$.metadata.isChunk') = 1 THEN 1.5
WHEN json_extract(content, '$.metadata.isMain') = 1 THEN 1.2
ELSE 1.0
END as keyword_score
FROM knowledge
WHERE (agentId IS NULL AND isShared = 1) OR agentId = ?
)
SELECT k.*,
v.vector_score,
kw.keyword_score,
(v.vector_score * kw.keyword_score) as combined_score
FROM knowledge k
JOIN vector_scores v ON k.id = v.id
LEFT JOIN keyword_matches kw ON k.id = kw.id
WHERE (k.agentId IS NULL AND k.isShared = 1) OR k.agentId = ?
AND (
v.vector_score >= ? -- Using match_threshold parameter
OR (kw.keyword_score > 1.0 AND v.vector_score >= 0.3)
)
ORDER BY combined_score DESC
LIMIT ?
`;

const searchParams = [
params.embedding,
params.agentId,
`%${params.searchText?.toLowerCase() || ""}%`,
params.agentId,
params.agentId,
params.match_threshold,
params.match_count,
];

try {
const rows = this.db
.prepare(sql)
.all(...searchParams) as KnowledgeSearchRow[];
const results = rows.map((row) => ({
id: row.id,
agentId: row.agentId,
content: JSON.parse(row.content),
embedding: row.embedding
? new Float32Array(row.embedding)
: undefined,
createdAt:
typeof row.createdAt === "string"
? Date.parse(row.createdAt)
: row.createdAt,
similarity: row.combined_score,
}));

await this.setCache({
key: cacheKey,
agentId: params.agentId,
value: JSON.stringify(results),
});

return results;
} catch (error) {
elizaLogger.error("Error in searchKnowledge:", error);
throw error;
}
}

async createKnowledge(knowledge: RAGKnowledgeItem): Promise<void> {
try {
this.db.transaction(() => {
const sql = `
INSERT INTO knowledge (
id, agentId, content, embedding, createdAt,
isMain, originalId, chunkIndex, isShared
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`;

const embeddingArray = knowledge.embedding || null;

const metadata = knowledge.content.metadata || {};
const isShared = metadata.isShared ? 1 : 0;

this.db
.prepare(sql)
.run(
knowledge.id,
metadata.isShared ? null : knowledge.agentId,
JSON.stringify(knowledge.content),
embeddingArray,
knowledge.createdAt || Date.now(),
metadata.isMain ? 1 : 0,
metadata.originalId || null,
metadata.chunkIndex || null,
isShared
);
})();
} catch (error: any) {
const isShared = knowledge.content.metadata?.isShared;
const isPrimaryKeyError =
error?.code === "SQLITE_CONSTRAINT_PRIMARYKEY";

if (isShared && isPrimaryKeyError) {
elizaLogger.info(
`Shared knowledge ${knowledge.id} already exists, skipping`
);
return;
} else if (
!isShared &&
!error.message?.includes("SQLITE_CONSTRAINT_PRIMARYKEY")
) {
elizaLogger.error(`Error creating knowledge ${knowledge.id}:`, {
error,
embeddingLength: knowledge.embedding?.length,
content: knowledge.content,
});
throw error;
}

elizaLogger.debug(
`Knowledge ${knowledge.id} already exists, skipping`
);
}
}

async removeKnowledge(id: UUID): Promise<void> {
if (typeof id !== "string") {
throw new Error("Knowledge ID must be a string");
}

try {
// Execute the transaction and ensure it's called with ()
await this.db.transaction(() => {
if (id.includes("*")) {
const pattern = id.replace("*", "%");
const sql = "DELETE FROM knowledge WHERE id LIKE ?";
elizaLogger.debug(
`[Knowledge Remove] Executing SQL: ${sql} with pattern: ${pattern}`
);
const stmt = this.db.prepare(sql);
const result = stmt.run(pattern);
elizaLogger.debug(
`[Knowledge Remove] Pattern deletion affected ${result.changes} rows`
);
return result.changes; // Return changes for logging
} else {
// Log queries before execution
const selectSql = "SELECT id FROM knowledge WHERE id = ?";
const chunkSql =
"SELECT id FROM knowledge WHERE json_extract(content, '$.metadata.originalId') = ?";
elizaLogger.debug(`[Knowledge Remove] Checking existence with:
Main: ${selectSql} [${id}]
Chunks: ${chunkSql} [${id}]`);

const mainEntry = this.db.prepare(selectSql).get(id) as
| ChunkRow
| undefined;
const chunks = this.db
.prepare(chunkSql)
.all(id) as ChunkRow[];

elizaLogger.debug(`[Knowledge Remove] Found:`, {
mainEntryExists: !!mainEntry?.id,
chunkCount: chunks.length,
chunkIds: chunks.map((c) => c.id),
});

// Execute and log chunk deletion
const chunkDeleteSql =
"DELETE FROM knowledge WHERE json_extract(content, '$.metadata.originalId') = ?";
elizaLogger.debug(
`[Knowledge Remove] Executing chunk deletion: ${chunkDeleteSql} [${id}]`
);
const chunkResult = this.db.prepare(chunkDeleteSql).run(id);
elizaLogger.debug(
`[Knowledge Remove] Chunk deletion affected ${chunkResult.changes} rows`
);

// Execute and log main entry deletion
const mainDeleteSql = "DELETE FROM knowledge WHERE id = ?";
elizaLogger.debug(
`[Knowledge Remove] Executing main deletion: ${mainDeleteSql} [${id}]`
);
const mainResult = this.db.prepare(mainDeleteSql).run(id);
elizaLogger.debug(
`[Knowledge Remove] Main deletion affected ${mainResult.changes} rows`
);

const totalChanges =
chunkResult.changes + mainResult.changes;
elizaLogger.debug(
`[Knowledge Remove] Total rows affected: ${totalChanges}`
);

// Verify deletion
const verifyMain = this.db.prepare(selectSql).get(id);
const verifyChunks = this.db.prepare(chunkSql).all(id);
elizaLogger.debug(
`[Knowledge Remove] Post-deletion check:`,
{
mainStillExists: !!verifyMain,
remainingChunks: verifyChunks.length,
}
);

return totalChanges; // Return changes for logging
}
})(); // Important: Call the transaction function

elizaLogger.debug(
`[Knowledge Remove] Transaction completed for id: ${id}`
);
} catch (error) {
elizaLogger.error("[Knowledge Remove] Error:", {
id,
error:
error instanceof Error
? {
message: error.message,
stack: error.stack,
name: error.name,
}
: error,
});
throw error;
}
}

async clearKnowledge(agentId: UUID, shared?: boolean): Promise<void> {
const sql = shared
? `DELETE FROM knowledge WHERE (agentId = ? OR isShared = 1)`
: `DELETE FROM knowledge WHERE agentId = ?`;
try {
this.db.prepare(sql).run(agentId);
} catch (error) {
elizaLogger.error(
`Error clearing knowledge for agent ${agentId}:`,
error
);
throw error;
}
}
}

const sqliteDatabaseAdapter: Adapter = {
Expand Down
44 changes: 1 addition & 43 deletions packages/core/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
Memory,
Relationship,
UUID,
RAGKnowledgeItem,
KnowledgeItem,
Participant,
IDatabaseAdapter,
} from "./types.ts";
Expand Down Expand Up @@ -393,48 +393,6 @@ export abstract class DatabaseAdapter<DB = any> implements IDatabaseAdapter {
userId: UUID;
}): Promise<Relationship[]>;

/**
* Retrieves knowledge items based on specified parameters.
* @param params Object containing search parameters
* @returns Promise resolving to array of knowledge items
*/
abstract getKnowledge(params: {
id?: UUID;
agentId: UUID;
limit?: number;
query?: string;
conversationContext?: string;
}): Promise<RAGKnowledgeItem[]>;

abstract searchKnowledge(params: {
agentId: UUID;
embedding: Float32Array;
match_threshold: number;
match_count: number;
searchText?: string;
}): Promise<RAGKnowledgeItem[]>;

/**
* Creates a new knowledge item in the database.
* @param knowledge The knowledge item to create
* @returns Promise resolving when creation is complete
*/
abstract createKnowledge(knowledge: RAGKnowledgeItem): Promise<void>;

/**
* Removes a knowledge item and its associated chunks from the database.
* @param id The ID of the knowledge item to remove
* @returns Promise resolving when removal is complete
*/
abstract removeKnowledge(id: UUID): Promise<void>;

/**
* Removes an agents full knowledge database and its associated chunks from the database.
* @param agentId The Agent ID of the knowledge items to remove
* @returns Promise resolving when removal is complete
*/
abstract clearKnowledge(agentId: UUID, shared?: boolean): Promise<void>;

/**
* Executes an operation with circuit breaker protection.
* @param operation A function that returns a Promise to be executed with circuit breaker protection
Expand Down
Loading
Loading