Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a separate queue for indexing embeddings and summaries, to prevent blocking the main SB indexing thread #61

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions silverbullet-ai.plug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,25 @@ functions:
searchEmbeddings:
path: src/embeddings.ts:searchEmbeddings
env: server
indexEmbeddings:
path: src/embeddings.ts:indexEmbeddings
env: server
events:
- page:index
indexSummaryEmbeddings:
path: src/embeddings.ts:indexSummary
queueEmbeddingGeneration:
path: src/embeddings.ts:queueEmbeddingGeneration
env: server
events:
- page:index
processEmbeddingsQueue:
path: src/embeddings.ts:processEmbeddingsQueue
mqSubscriptions:
- queue: aiEmbeddingsQueue
batchSize: 1
autoAck: true
pollInterval: 600000
processSummaryQueue:
path: src/embeddings.ts:processSummaryQueue
mqSubscriptions:
- queue: aiSummaryQueue
batchSize: 1
autoAck: true
pollInterval: 600000
generateEmbeddings:
path: src/embeddings.ts:generateEmbeddings
generateEmbeddingsOnServer:
Expand Down
65 changes: 61 additions & 4 deletions src/embeddings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import type {
} from "./types.ts";
import { indexObjects, queryObjects } from "$sbplugs/index/plug_api.ts";
import { renderToText } from "$sb/lib/tree.ts";
import { mq } from "$sb/syscalls.ts";
import { MQMessage } from "$sb/types.ts";
import {
currentEmbeddingModel,
currentEmbeddingProvider,
initIfNeeded,
} from "../src/init.ts";
import { log, supportsServerProxyCall } from "./utils.ts";
import { editor, system } from "$sb/syscalls.ts";
import { editor, markdown, space, system } from "$sb/syscalls.ts";
import { aiSettings, configureSelectedModel } from "./init.ts";
import * as cache from "./cache.ts";

Expand Down Expand Up @@ -68,7 +70,7 @@ export async function shouldIndexSummaries() {
* Generate embeddings for each paragraph in a page, and then indexes
* them.
*/
export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) {
export async function indexEmbeddings(page: string) {
if (!await shouldIndexEmbeddings()) {
return;
}
Expand All @@ -77,6 +79,9 @@ export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) {
return;
}

const pageText = await space.readPage(page);
const tree = await markdown.parseMarkdown(pageText);

if (!tree.children) {
return;
}
Expand Down Expand Up @@ -142,7 +147,7 @@ export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) {
/**
* Generate a summary for a page, and then indexes it.
*/
export async function indexSummary({ name: page, tree }: IndexTreeEvent) {
export async function indexSummary(page: string) {
if (!await shouldIndexSummaries()) {
return;
}
Expand All @@ -151,10 +156,14 @@ export async function indexSummary({ name: page, tree }: IndexTreeEvent) {
return;
}

const text = await space.readPage(page);
const tree = await markdown.parseMarkdown(text);

if (!tree.children) {
return;
}

const startTime = Date.now();
const pageText = renderToText(tree);
const summaryModel = aiSettings.textModels.find((model) =>
model.name === aiSettings.indexSummaryModelName
Expand Down Expand Up @@ -201,12 +210,60 @@ export async function indexSummary({ name: page, tree }: IndexTreeEvent) {

await indexObjects<AISummaryObject>(page, [summaryObject]);

const endTime = Date.now();
const duration = (endTime - startTime) / 1000;
log(
"any",
`AI: Indexed summary for page ${page}`,
`AI: Indexed summary for page ${page} in ${duration} seconds`,
);
}

// Listen for page:index events and queue up embedding and summary indexing to
// prevent the main SB indexing process from being blocked.
export async function queueEmbeddingGeneration(
{ name: page, tree }: IndexTreeEvent,
) {
await initIfNeeded();

if (!canIndexPage(page)) {
return;
}

if (!tree.children) {
return;
}

if (await shouldIndexEmbeddings()) {
await mq.send("aiEmbeddingsQueue", page);
}

if (await shouldIndexSummaries()) {
await mq.send("aiSummaryQueue", page);
}
}
Comment on lines +223 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling for queuing operations.

Consider adding error handling for the queuing operations to ensure robustness.

Use this diff to add error handling:

    try {
      await mq.send("aiEmbeddingsQueue", page);
    } catch (error) {
      log("error", `Failed to queue embedding task for page: ${error}`);
    }

    try {
      await mq.send("aiSummaryQueue", page);
    } catch (error) {
      log("error", `Failed to queue summary task for page: ${error}`);
    }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function queueEmbeddingGeneration(
{ name: page, tree }: IndexTreeEvent,
) {
await initIfNeeded();
if (!canIndexPage(page)) {
return;
}
if (!tree.children) {
return;
}
if (await shouldIndexEmbeddings()) {
await mq.send("aiEmbeddingsQueue", page);
}
if (await shouldIndexSummaries()) {
await mq.send("aiSummaryQueue", page);
}
}
export async function queueEmbeddingGeneration(
{ name: page, tree }: IndexTreeEvent,
) {
await initIfNeeded();
if (!canIndexPage(page)) {
return;
}
if (!tree.children) {
return;
}
if (await shouldIndexEmbeddings()) {
try {
await mq.send("aiEmbeddingsQueue", page);
} catch (error) {
log("error", `Failed to queue embedding task for page: ${error}`);
}
}
if (await shouldIndexSummaries()) {
try {
await mq.send("aiSummaryQueue", page);
} catch (error) {
log("error", `Failed to queue summary task for page: ${error}`);
}
}
}


export async function processEmbeddingsQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing embeddings for file ${pageName}`);
await indexEmbeddings(pageName);
}
const queueStats = await mq.getQueueStats("aiEmbeddingsQueue");
console.log(`AI: Embeddings queue stats: ${JSON.stringify(queueStats)}`);
}
Comment on lines +245 to +254
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling for embedding generation.

Consider adding error handling for the embedding generation process to ensure robustness.

Use this diff to add error handling:

    try {
      await indexEmbeddings(pageName);
    } catch (error) {
      console.error(`Failed to index embeddings for file ${pageName}: ${error}`);
    }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function processEmbeddingsQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing embeddings for file ${pageName}`);
await indexEmbeddings(pageName);
}
const queueStats = await mq.getQueueStats("aiEmbeddingsQueue");
console.log(`AI: Embeddings queue stats: ${JSON.stringify(queueStats)}`);
}
export async function processEmbeddingsQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing embeddings for file ${pageName}`);
try {
await indexEmbeddings(pageName);
} catch (error) {
console.error(`Failed to index embeddings for file ${pageName}: ${error}`);
}
}
const queueStats = await mq.getQueueStats("aiEmbeddingsQueue");
console.log(`AI: Embeddings queue stats: ${JSON.stringify(queueStats)}`);
}


export async function processSummaryQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing summary for ${pageName}`);
await indexSummary(pageName);
}
const queueStats = await mq.getQueueStats("aiSummaryQueue");
console.log(`AI: Summary queue stats: ${JSON.stringify(queueStats)}`);
}
Comment on lines +256 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling for summary generation.

Consider adding error handling for the summary generation process to ensure robustness.

Use this diff to add error handling:

    try {
      await indexSummary(pageName);
    } catch (error) {
      console.error(`Failed to index summary for ${pageName}: ${error}`);
    }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function processSummaryQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing summary for ${pageName}`);
await indexSummary(pageName);
}
const queueStats = await mq.getQueueStats("aiSummaryQueue");
console.log(`AI: Summary queue stats: ${JSON.stringify(queueStats)}`);
}
export async function processSummaryQueue(messages: MQMessage[]) {
await initIfNeeded();
for (const message of messages) {
const pageName: string = message.body;
console.log(`AI: Generating and indexing summary for ${pageName}`);
try {
await indexSummary(pageName);
} catch (error) {
console.error(`Failed to index summary for ${pageName}: ${error}`);
}
}
const queueStats = await mq.getQueueStats("aiSummaryQueue");
console.log(`AI: Summary queue stats: ${JSON.stringify(queueStats)}`);
}


export async function getAllEmbeddings(): Promise<EmbeddingObject[]> {
if (await supportsServerProxyCall()) {
return (await syscall(
Expand Down
Loading