-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: prepare embedder for Production #9517
Merged
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
d86eb2d
feat: support ad-hoc metadata inserts
mattkrick 8628e3e
feat: triggers from metadata to queue
mattkrick 285f69b
feat: move job queue to pg
mattkrick 0b024dd
begin retry
mattkrick ae6126f
feat: support retries after queue goes dry
mattkrick 9fb9c4f
chore: separate abstract models into their own files
mattkrick 20f5d96
feat: support recursive text splitting for chunks
mattkrick eff8eb9
validate discussions to only include ended meetings
mattkrick 7ec3d6a
support historical info
mattkrick 49c3eae
Merge branch 'master' into feat/embedder1
mattkrick 46dafac
feat: support listening to jobs from app
mattkrick 47e83d9
feat: support calls from app
mattkrick 7cee59e
merge master
mattkrick 5c96c41
fix migration conflict
mattkrick 49b349b
fix: import history
mattkrick bf9a379
fix: dataloader mem leak
mattkrick f7ccdc6
feat: handle stalled jobs
mattkrick b573083
self-review
mattkrick ba906e0
remove redlock for non-historical jobs
mattkrick 10953b5
feat: clean comments
mattkrick 7953939
remove pubsub
mattkrick c2b6753
fix lint errors
mattkrick 8852ac2
fix lint
mattkrick 2ba07c1
cast to any for CI
mattkrick 7bcd083
build embeddings table in CI for the typings
mattkrick 368b34c
codecheck after server starts
mattkrick 1158523
use pgvector in CI
mattkrick ddfc151
POSTGRES_USE_PGVECTOR='true'
mattkrick 07e0d12
lazy Kysely Codegen
mattkrick 8360b95
fix: release-please and CI pgvector image bump
mattkrick e74754d
chore: rename files for clarity
mattkrick e7991d3
rename yaml to yml
mattkrick ef1d8c4
feat: support priority
mattkrick 59b0647
feat: custom text splitter
mattkrick 3faa442
feat: add language to metadata table
mattkrick 2ab9f61
feat: support multiple workers
mattkrick 7fc4136
feat: set workers via env var
mattkrick ea32be7
fix: handle shutdown and stalled jobs
mattkrick bc1cf74
update readme for parabol-ubi
mattkrick 7d82b28
remove unused trigger logic
mattkrick b609f3e
turn on dev servers
mattkrick 8679d2c
Merge branch 'master' into feat/embedder1
mattkrick 0c80dd6
fix: rename migration
mattkrick 826a044
Merge branch 'master' into feat/embedder1
mattkrick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
export const EmbedderChannelId = { | ||
join: (serverId: string) => `embedder:${serverId}`, | ||
split: (id: string) => { | ||
const [, serverId] = id.split(':') | ||
return serverId | ||
} | ||
} | ||
|
||
export default EmbedderChannelId |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
export const EMBEDDER_JOB_PRIORITY = { | ||
MEETING: 40, | ||
DEFAULT: 50, | ||
TOPIC_HISTORY: 80, | ||
NEW_MODEL: 90 | ||
} as const |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import {Selectable, sql} from 'kysely' | ||
import ms from 'ms' | ||
import sleep from 'parabol-client/utils/sleep' | ||
import 'parabol-server/initSentry' | ||
import getKysely from 'parabol-server/postgres/getKysely' | ||
import {DB} from 'parabol-server/postgres/pg' | ||
import RootDataLoader from '../server/dataloader/RootDataLoader' | ||
import {processJob} from './processJob' | ||
import {Logger} from '../server/utils/Logger' | ||
|
||
export type DBJob = Selectable<DB['EmbeddingsJobQueue']> | ||
export type EmbedJob = DBJob & { | ||
jobType: 'embed' | ||
jobData: { | ||
embeddingsMetadataId: number | ||
model: string | ||
} | ||
} | ||
export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}} | ||
export type Job = EmbedJob | RerankJob | ||
|
||
export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> { | ||
[Symbol.asyncIterator]() { | ||
return this | ||
} | ||
dataLoader = new RootDataLoader({maxBatchSize: 1000}) | ||
async next(): Promise<IteratorResult<Job>> { | ||
const pg = getKysely() | ||
const getJob = (isFailed: boolean) => { | ||
return pg | ||
.with( | ||
(cte) => cte('ids').materialized(), | ||
(db) => | ||
db | ||
.selectFrom('EmbeddingsJobQueue') | ||
.select('id') | ||
.orderBy(['priority']) | ||
.$if(!isFailed, (db) => db.where('state', '=', 'queued')) | ||
.$if(isFailed, (db) => | ||
db.where('state', '=', 'failed').where('retryAfter', '<', new Date()) | ||
) | ||
.limit(1) | ||
.forUpdate() | ||
.skipLocked() | ||
) | ||
.updateTable('EmbeddingsJobQueue') | ||
.set({state: 'running', startAt: new Date()}) | ||
.where('id', '=', sql<number>`ANY(SELECT id FROM ids)`) | ||
.returningAll() | ||
.executeTakeFirst() | ||
} | ||
const job = (await getJob(false)) || (await getJob(true)) | ||
if (!job) { | ||
Logger.log('JobQueueStream: no jobs found') | ||
// queue is empty, so sleep for a while | ||
await sleep(ms('1m')) | ||
return this.next() | ||
} | ||
|
||
const isSuccessful = await processJob(job as Job, this.dataLoader) | ||
if (isSuccessful) { | ||
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow() | ||
} | ||
return {done: false, value: job as Job} | ||
} | ||
return() { | ||
return Promise.resolve({done: true as const, value: undefined}) | ||
} | ||
throw(error: any) { | ||
return Promise.resolve({done: true, value: error}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic' | ||
import {MessageToEmbedder} from './custom' | ||
|
||
export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => { | ||
return Promise.all( | ||
objectTypes.map((type) => { | ||
switch (type) { | ||
case 'retrospectiveDiscussionTopic': | ||
return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(options) | ||
default: | ||
throw new Error(`Invalid object type: ${type}`) | ||
} | ||
}) | ||
) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i added type safety on the embedding model tables so we can't code check until the embedding service starts up