-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a separate queue for indexing embeddings and summaries, to prevent blocking the main SB indexing thread #61
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -7,13 +7,15 @@ import type { | |||||||||||||||||||||||||||||||||||||||||||||||||
} from "./types.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { indexObjects, queryObjects } from "$sbplugs/index/plug_api.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { renderToText } from "$sb/lib/tree.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { mq } from "$sb/syscalls.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { MQMessage } from "$sb/types.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { | ||||||||||||||||||||||||||||||||||||||||||||||||||
currentEmbeddingModel, | ||||||||||||||||||||||||||||||||||||||||||||||||||
currentEmbeddingProvider, | ||||||||||||||||||||||||||||||||||||||||||||||||||
initIfNeeded, | ||||||||||||||||||||||||||||||||||||||||||||||||||
} from "../src/init.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { log, supportsServerProxyCall } from "./utils.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { editor, system } from "$sb/syscalls.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { editor, markdown, space, system } from "$sb/syscalls.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import { aiSettings, configureSelectedModel } from "./init.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import * as cache from "./cache.ts"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -68,7 +70,7 @@ export async function shouldIndexSummaries() { | |||||||||||||||||||||||||||||||||||||||||||||||||
* Generate embeddings for each paragraph in a page, and then indexes | ||||||||||||||||||||||||||||||||||||||||||||||||||
* them. | ||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
export async function indexEmbeddings(page: string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if (!await shouldIndexEmbeddings()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -77,6 +79,9 @@ export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
const pageText = await space.readPage(page); | ||||||||||||||||||||||||||||||||||||||||||||||||||
const tree = await markdown.parseMarkdown(pageText); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (!tree.children) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -142,7 +147,7 @@ export async function indexEmbeddings({ name: page, tree }: IndexTreeEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||
* Generate a summary for a page, and then indexes it. | ||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
export async function indexSummary({ name: page, tree }: IndexTreeEvent) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
export async function indexSummary(page: string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if (!await shouldIndexSummaries()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -151,10 +156,14 @@ export async function indexSummary({ name: page, tree }: IndexTreeEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
const text = await space.readPage(page); | ||||||||||||||||||||||||||||||||||||||||||||||||||
const tree = await markdown.parseMarkdown(text); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (!tree.children) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
const startTime = Date.now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
const pageText = renderToText(tree); | ||||||||||||||||||||||||||||||||||||||||||||||||||
const summaryModel = aiSettings.textModels.find((model) => | ||||||||||||||||||||||||||||||||||||||||||||||||||
model.name === aiSettings.indexSummaryModelName | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -201,12 +210,60 @@ export async function indexSummary({ name: page, tree }: IndexTreeEvent) { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
await indexObjects<AISummaryObject>(page, [summaryObject]); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
const endTime = Date.now(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
const duration = (endTime - startTime) / 1000; | ||||||||||||||||||||||||||||||||||||||||||||||||||
log( | ||||||||||||||||||||||||||||||||||||||||||||||||||
"any", | ||||||||||||||||||||||||||||||||||||||||||||||||||
`AI: Indexed summary for page ${page}`, | ||||||||||||||||||||||||||||||||||||||||||||||||||
`AI: Indexed summary for page ${page} in ${duration} seconds`, | ||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// Listen for page:index events and queue up embedding and summary indexing to | ||||||||||||||||||||||||||||||||||||||||||||||||||
// prevent the main SB indexing process from being blocked. | ||||||||||||||||||||||||||||||||||||||||||||||||||
export async function queueEmbeddingGeneration( | ||||||||||||||||||||||||||||||||||||||||||||||||||
{ name: page, tree }: IndexTreeEvent, | ||||||||||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await initIfNeeded(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (!canIndexPage(page)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (!tree.children) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (await shouldIndexEmbeddings()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await mq.send("aiEmbeddingsQueue", page); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if (await shouldIndexSummaries()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await mq.send("aiSummaryQueue", page); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
export async function processEmbeddingsQueue(messages: MQMessage[]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await initIfNeeded(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
for (const message of messages) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
const pageName: string = message.body; | ||||||||||||||||||||||||||||||||||||||||||||||||||
console.log(`AI: Generating and indexing embeddings for file ${pageName}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||
await indexEmbeddings(pageName); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
const queueStats = await mq.getQueueStats("aiEmbeddingsQueue"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
console.log(`AI: Embeddings queue stats: ${JSON.stringify(queueStats)}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+245
to
+254
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for embedding generation. Consider adding error handling for the embedding generation process to ensure robustness. Use this diff to add error handling: try {
await indexEmbeddings(pageName);
} catch (error) {
console.error(`Failed to index embeddings for file ${pageName}: ${error}`);
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
export async function processSummaryQueue(messages: MQMessage[]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
await initIfNeeded(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
for (const message of messages) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
const pageName: string = message.body; | ||||||||||||||||||||||||||||||||||||||||||||||||||
console.log(`AI: Generating and indexing summary for ${pageName}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||
await indexSummary(pageName); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
const queueStats = await mq.getQueueStats("aiSummaryQueue"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
console.log(`AI: Summary queue stats: ${JSON.stringify(queueStats)}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+256
to
+265
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for summary generation. Consider adding error handling for the summary generation process to ensure robustness. Use this diff to add error handling: try {
await indexSummary(pageName);
} catch (error) {
console.error(`Failed to index summary for ${pageName}: ${error}`);
} Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
export async function getAllEmbeddings(): Promise<EmbeddingObject[]> { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if (await supportsServerProxyCall()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return (await syscall( | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for queuing operations.
Consider adding error handling for the queuing operations to ensure robustness.
Use this diff to add error handling:
Committable suggestion