Skip to content

Commit

Permalink
fix(Postgres PGVector Store Node): Release postgres connections back …
Browse files Browse the repository at this point in the history
…to the pool (n8n-io#12723)

Co-authored-by: Oleg Ivaniv <[email protected]>
  • Loading branch information
netroy and OlegIvaniv authored Jan 27, 2025
1 parent 02df25c commit 663dfb4
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { MemoryVectorStore } from 'langchain/vectorstores/memory';
import type { INodeProperties } from 'n8n-workflow';

import { createVectorStoreNode } from '../shared/createVectorStoreNode';
Expand All @@ -20,7 +21,7 @@ const insertFields: INodeProperties[] = [
},
];

export class VectorStoreInMemory extends createVectorStoreNode({
export class VectorStoreInMemory extends createVectorStoreNode<MemoryVectorStore>({
meta: {
displayName: 'In-Memory Vector Store',
name: 'vectorStoreInMemory',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class ExtendedPGVectorStore extends PGVectorStore {
}
}

export class VectorStorePGVector extends createVectorStoreNode({
export class VectorStorePGVector extends createVectorStoreNode<ExtendedPGVectorStore>({
meta: {
description: 'Work with your data in Postgresql with the PGVector extension',
icon: 'file:postgres.svg',
Expand Down Expand Up @@ -274,6 +274,7 @@ export class VectorStorePGVector extends createVectorStoreNode({

return await ExtendedPGVectorStore.initialize(embeddings, config);
},

async populateVectorStore(context, embeddings, documents, itemIndex) {
// NOTE: if you are to create the HNSW index before use, you need to consider moving the distanceStrategy field to
// shared fields, because you need that strategy when creating the index.
Expand Down Expand Up @@ -307,6 +308,11 @@ export class VectorStorePGVector extends createVectorStoreNode({
metadataColumnName: 'metadata',
}) as ColumnOptions;

await PGVectorStore.fromDocuments(documents, embeddings, config);
const vectorStore = await PGVectorStore.fromDocuments(documents, embeddings, config);
vectorStore.client?.release();
},

releaseVectorStoreClient(vectorStore) {
vectorStore.client?.release();
},
}) {}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const insertFields: INodeProperties[] = [
},
];

export class VectorStorePinecone extends createVectorStoreNode({
export class VectorStorePinecone extends createVectorStoreNode<PineconeStore>({
meta: {
displayName: 'Pinecone Vector Store',
name: 'vectorStorePinecone',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const retrieveFields: INodeProperties[] = [
},
];

export class VectorStoreQdrant extends createVectorStoreNode({
export class VectorStoreQdrant extends createVectorStoreNode<ExtendedQdrantVectorStore>({
meta: {
displayName: 'Qdrant Vector Store',
name: 'vectorStoreQdrant',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const retrieveFields: INodeProperties[] = [

const updateFields: INodeProperties[] = [...insertFields];

export class VectorStoreSupabase extends createVectorStoreNode({
export class VectorStoreSupabase extends createVectorStoreNode<SupabaseVectorStore>({
meta: {
description: 'Work with your data in Supabase Vector Store',
icon: 'file:supabase.svg',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const retrieveFields: INodeProperties[] = [
},
];

export class VectorStoreZep extends createVectorStoreNode({
export class VectorStoreZep extends createVectorStoreNode<ZepVectorStore | ZepCloudVectorStore>({
meta: {
displayName: 'Zep Vector Store',
name: 'vectorStoreZep',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ interface NodeMeta {
operationModes?: NodeOperationMode[];
}

export interface VectorStoreNodeConstructorArgs {
export interface VectorStoreNodeConstructorArgs<T extends VectorStore = VectorStore> {
meta: NodeMeta;
methods?: {
listSearch?: {
Expand Down Expand Up @@ -77,7 +77,8 @@ export interface VectorStoreNodeConstructorArgs {
filter: Record<string, never> | undefined,
embeddings: Embeddings,
itemIndex: number,
) => Promise<VectorStore>;
) => Promise<T>;
releaseVectorStoreClient?: (vectorStore: T) => void;
}

function transformDescriptionForOperationMode(
Expand All @@ -90,11 +91,15 @@ function transformDescriptionForOperationMode(
}));
}

function isUpdateSupported(args: VectorStoreNodeConstructorArgs): boolean {
function isUpdateSupported<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): boolean {
return args.meta.operationModes?.includes('update') ?? false;
}

function getOperationModeOptions(args: VectorStoreNodeConstructorArgs): INodePropertyOptions[] {
function getOperationModeOptions<T extends VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
): INodePropertyOptions[] {
const enabledOperationModes = args.meta.operationModes ?? DEFAULT_OPERATION_MODES;

const allOptions = [
Expand Down Expand Up @@ -137,7 +142,9 @@ function getOperationModeOptions(args: VectorStoreNodeConstructorArgs): INodePro
);
}

export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
export const createVectorStoreNode = <T extends VectorStore = VectorStore>(
args: VectorStoreNodeConstructorArgs<T>,
) =>
class VectorStoreNodeType implements INodeType {
description: INodeTypeDescription = {
displayName: args.meta.displayName,
Expand Down Expand Up @@ -334,38 +341,42 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
embeddings,
itemIndex,
);
const prompt = this.getNodeParameter('prompt', itemIndex) as string;
const topK = this.getNodeParameter('topK', itemIndex, 4) as number;

const embeddedPrompt = await embeddings.embedQuery(prompt);
const docs = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
try {
const prompt = this.getNodeParameter('prompt', itemIndex) as string;
const topK = this.getNodeParameter('topK', itemIndex, 4) as number;

const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;

const serializedDocs = docs.map(([doc, score]) => {
const document = {
pageContent: doc.pageContent,
...(includeDocumentMetadata ? { metadata: doc.metadata } : {}),
};

return {
json: { document, score },
pairedItem: {
item: itemIndex,
},
};
});

resultData.push(...serializedDocs);
logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
const embeddedPrompt = await embeddings.embedQuery(prompt);
const docs = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);

const includeDocumentMetadata = this.getNodeParameter(
'includeDocumentMetadata',
itemIndex,
true,
) as boolean;

const serializedDocs = docs.map(([doc, score]) => {
const document = {
pageContent: doc.pageContent,
...(includeDocumentMetadata ? { metadata: doc.metadata } : {}),
};

return {
json: { document, score },
pairedItem: {
item: itemIndex,
},
};
});

resultData.push(...serializedDocs);
logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
}

return [resultData];
Expand Down Expand Up @@ -427,24 +438,28 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
itemIndex,
);

const { processedDocuments, serializedDocuments } = await processDocument(
loader,
itemData,
itemIndex,
);
try {
const { processedDocuments, serializedDocuments } = await processDocument(
loader,
itemData,
itemIndex,
);

if (processedDocuments?.length !== 1) {
throw new NodeOperationError(this.getNode(), 'Single document per item expected');
}
if (processedDocuments?.length !== 1) {
throw new NodeOperationError(this.getNode(), 'Single document per item expected');
}

resultData.push(...serializedDocuments);
resultData.push(...serializedDocuments);

// Use ids option to upsert instead of insert
await vectorStore.addDocuments(processedDocuments, {
ids: [documentId],
});
// Use ids option to upsert instead of insert
await vectorStore.addDocuments(processedDocuments, {
ids: [documentId],
});

logAiEvent(this, 'ai-vector-store-updated');
logAiEvent(this, 'ai-vector-store-updated');
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
}

return [resultData];
Expand All @@ -468,6 +483,9 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
const vectorStore = await args.getVectorStoreClient(this, filter, embeddings, itemIndex);
return {
response: logWrapper(vectorStore, this),
closeFunction: async () => {
args.releaseVectorStoreClient?.(vectorStore);
},
};
}

Expand All @@ -491,23 +509,28 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
embeddings,
itemIndex,
);
const embeddedPrompt = await embeddings.embedQuery(input);
const documents = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
return documents
.map((document) => {
if (includeDocumentMetadata) {
return { type: 'text', text: JSON.stringify(document[0]) };
}
return {
type: 'text',
text: JSON.stringify({ pageContent: document[0].pageContent }),
};
})
.filter((document) => !!document);

try {
const embeddedPrompt = await embeddings.embedQuery(input);
const documents = await vectorStore.similaritySearchVectorWithScore(
embeddedPrompt,
topK,
filter,
);
return documents
.map((document) => {
if (includeDocumentMetadata) {
return { type: 'text', text: JSON.stringify(document[0]) };
}
return {
type: 'text',
text: JSON.stringify({ pageContent: document[0].pageContent }),
};
})
.filter((document) => !!document);
} finally {
args.releaseVectorStoreClient?.(vectorStore);
}
},
});

Expand Down

0 comments on commit 663dfb4

Please sign in to comment.