Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prepare embedder for Production #9517

Merged
merged 44 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d86eb2d
feat: support ad-hoc metadata inserts
mattkrick Mar 7, 2024
8628e3e
feat: triggers from metadata to queue
mattkrick Mar 8, 2024
285f69b
feat: move job queue to pg
mattkrick Mar 8, 2024
0b024dd
begin retry
mattkrick Mar 13, 2024
ae6126f
feat: support retries after queue goes dry
mattkrick Mar 13, 2024
9fb9c4f
chore: separate abstract models into their own files
mattkrick Mar 13, 2024
20f5d96
feat: support recursive text splitting for chunks
mattkrick Mar 14, 2024
eff8eb9
validate discussions to only include ended meetings
mattkrick Mar 18, 2024
7ec3d6a
support historical info
mattkrick Mar 18, 2024
49c3eae
Merge branch 'master' into feat/embedder1
mattkrick Mar 18, 2024
46dafac
feat: support listening to jobs from app
mattkrick Mar 19, 2024
47e83d9
feat: support calls from app
mattkrick Mar 19, 2024
7cee59e
merge master
mattkrick Mar 19, 2024
5c96c41
fix migration conflict
mattkrick Mar 19, 2024
49b349b
fix: import history
mattkrick Mar 19, 2024
bf9a379
fix: dataloader mem leak
mattkrick Mar 19, 2024
f7ccdc6
feat: handle stalled jobs
mattkrick Mar 19, 2024
b573083
self-review
mattkrick Mar 19, 2024
ba906e0
remove redlock for non-historical jobs
mattkrick Mar 19, 2024
10953b5
feat: clean comments
mattkrick Mar 19, 2024
7953939
remove pubsub
mattkrick Mar 19, 2024
c2b6753
fix lint errors
mattkrick Mar 19, 2024
8852ac2
fix lint
mattkrick Mar 19, 2024
2ba07c1
cast to any for CI
mattkrick Mar 19, 2024
7bcd083
build embeddings table in CI for the typings
mattkrick Mar 19, 2024
368b34c
codecheck after server starts
mattkrick Mar 19, 2024
1158523
use pgvector in CI
mattkrick Mar 19, 2024
ddfc151
POSTGRES_USE_PGVECTOR='true'
mattkrick Mar 19, 2024
07e0d12
lazy Kysely Codegen
mattkrick Mar 20, 2024
8360b95
fix: release-please and CI pgvector image bump
mattkrick Mar 20, 2024
e74754d
chore: rename files for clarity
mattkrick Mar 20, 2024
e7991d3
rename yaml to yml
mattkrick Mar 21, 2024
ef1d8c4
feat: support priority
mattkrick Mar 26, 2024
59b0647
feat: custom text splitter
mattkrick Mar 27, 2024
3faa442
feat: add language to metadata table
mattkrick Mar 27, 2024
2ab9f61
feat: support multiple workers
mattkrick Mar 27, 2024
7fc4136
feat: set workers via env var
mattkrick Mar 27, 2024
ea32be7
fix: handle shutdown and stalled jobs
mattkrick Mar 27, 2024
bc1cf74
update readme for parabol-ubi
mattkrick Mar 28, 2024
7d82b28
remove unused trigger logic
mattkrick Mar 28, 2024
b609f3e
turn on dev servers
mattkrick Mar 28, 2024
8679d2c
Merge branch 'master' into feat/embedder1
mattkrick Mar 28, 2024
0c80dd6
fix: rename migration
mattkrick Mar 28, 2024
826a044
Merge branch 'master' into feat/embedder1
mattkrick Mar 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SOCKET_PORT='3001'
# AI MODELS
AI_EMBEDDING_MODELS='[{"model": "text-embeddings-inference:llmrails/ember-v1", "url": "http://localhost:3040/"}]'
AI_GENERATION_MODELS='[{"model": "text-generation-inference:TheBloke/zephyr-7b-beta", "url": "http://localhost:3050/"}]'
AI_EMBEDDER_ENABLED='true'
AI_EMBEDDER_WORKERS='1'

# APPLICATION
# AMPLITUDE_WRITE_KEY='key_AMPLITUDE_WRITE_KEY'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
id-token: "write"
services:
postgres:
image: pgvector/pgvector:pg15
image: pgvector/pgvector:0.6.2-pg15
# This env variables must be the same in the file PARABOL_BUILD_ENV_PATH
env:
POSTGRES_PASSWORD: "temppassword"
Expand Down Expand Up @@ -143,6 +143,6 @@ jobs:
uses: ravsamhq/notify-slack-action@v2
with:
status: ${{ job.status }}
notify_when: 'failure'
notify_when: "failure"
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_GH_ACTIONS_NOTIFICATIONS }}
14 changes: 8 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
id-token: "write"
services:
postgres:
image: postgres:15.4
image: pgvector/pgvector:0.6.2-pg15
# This env variables must be the same in the file PARABOL_BUILD_ENV_PATH
env:
POSTGRES_PASSWORD: "temppassword"
Expand Down Expand Up @@ -78,17 +78,13 @@ jobs:
yarn db:migrate
yarn pg:migrate up
yarn pg:build
yarn pg:generate

- name: Build for testing
run: yarn build

- name: Verify source is clean
run: git diff --quiet HEAD || (echo "Changes in generated files detected"; git diff; exit 1)

- name: Check Code Quality
run: yarn codecheck

- name: Run Predeploy for Testing
run: yarn predeploy

Expand All @@ -100,6 +96,12 @@ jobs:
wait-on: |
http://localhost:3000/graphql

- name: Kysely Codegen
run: yarn pg:generate

- name: Check Code Quality
run: yarn codecheck
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added type safety on the embedding model tables so we can't code check until the embedding service starts up


- name: Run server tests
run: yarn test:server -- --reporters=default --reporters=jest-junit
env:
Expand Down Expand Up @@ -139,6 +141,6 @@ jobs:
uses: ravsamhq/notify-slack-action@v2
with:
status: ${{ job.status }}
notify_when: 'failure'
notify_when: "failure"
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_GH_ACTIONS_NOTIFICATIONS }}
24 changes: 12 additions & 12 deletions docker/images/parabol-ubi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ Recommended:

## Variables

| Name | Description | Possible values | Recommended value |
| -------------------- | ----------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------- | ------------------------------------------------------------------- |
| `postgresql_tag` | PostgreSQL version from the [Docker image](https://hub.docker.com/_/postgres) | `Any tag` | `15.4` |
| `rethinkdb_tag` | RethinkDB version from the [Docker image](https://hub.docker.com/_/rethinkdb) | `Any tag` | `2.4.2` |
| `redis_tag` | Redis version from the [Docker image](https://hub.docker.com/_/redis) | `Any tag` | `7.0-alpine` |
| `_BUILD_ENV_PATH` | File `.env` used by the application during the build process | `Relative path from the root level of the repository` | `docker/parabol-ubi/environments/basic-env` |
| `_NODE_VERSION` | Node version, used by Docker to use the Docker image node:\_NODE_VERSION as base image to build | `Same as in root package.json` | |
| `_DOCKERFILE` | Dockerfile used to build the image | `Relative path from the root level of the repository` | `./docker/parabol-ubi/dockerfiles/basic.dockerfile` |
| `_DOCKER_REPOSITORY` | The destination repository | `String` | `parabol` |
| `_DOCKER_TAG` | Tag for the produced image | `String` | |
| Name | Description | Possible values | Recommended value |
| -------------------- | ----------------------------------------------------------------------------------------------- | ----------------------------------------------------- | --------------------------------------------------- |
| `postgresql_tag` | PostgreSQL version from the [Docker image](https://hub.docker.com/r/pgvector/pgvector) | `Any tag` | `0.6.2-pg15` |
| `rethinkdb_tag` | RethinkDB version from the [Docker image](https://hub.docker.com/_/rethinkdb) | `Any tag` | `2.4.2` |
| `redis_tag` | Redis version from the [Docker image](https://hub.docker.com/_/redis) | `Any tag` | `7.0-alpine` |
| `_BUILD_ENV_PATH` | File `.env` used by the application during the build process | `Relative path from the root level of the repository` | `docker/parabol-ubi/environments/basic-env` |
| `_NODE_VERSION` | Node version, used by Docker to use the Docker image node:\_NODE_VERSION as base image to build | `Same as in root package.json` | |
| `_DOCKERFILE` | Dockerfile used to build the image | `Relative path from the root level of the repository` | `./docker/parabol-ubi/dockerfiles/basic.dockerfile` |
| `_DOCKER_REPOSITORY` | The destination repository | `String` | `parabol` |
| `_DOCKER_TAG` | Tag for the produced image | `String` | |

Example of variables:

```commandLine
export postgresql_tag=15.4; \
export postgresql_tag=0.6.2-pg15; \
export rethinkdb_tag=2.4.2; \
export redis_tag=7.0-alpine; \
export _BUILD_ENV_PATH=docker/parabol-ubi/environments/basic-env; \
Expand Down Expand Up @@ -61,7 +61,7 @@ cp $_BUILD_ENV_PATH ./.env
> :warning: Stop all database containers you might have running before executing the following command. If other database containers are running, some ports might be already taken.

```commandLine
docker run --name temp-postgres -e POSTGRES_PASSWORD=temppassword -e POSTGRES_USER=tempuser -e POSTGRES_DB=tempdb -d -p 5432:5432 postgres:$postgresql_tag && \
docker run --name temp-postgres -e POSTGRES_PASSWORD=temppassword -e POSTGRES_USER=tempuser -e POSTGRES_DB=tempdb -d -p 5432:5432 pgvector/pgvector:$postgresql_tag && \
docker run --name temp-rethinkdb -d -p 28015:28015 -p 29015:29015 -p 8080:8080 rethinkdb:$rethinkdb_tag && \
docker run --name temp-redis -d -p 6379:6379 redis:$redis_tag
```
Expand Down
4 changes: 4 additions & 0 deletions docker/images/parabol-ubi/environments/pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ STRIPE_PUBLISHABLE_KEY='pk_test_MNoKbCzQX0lhktuxxI7M14wd'
STRIPE_SECRET_KEY=''
STRIPE_WEBHOOK_SECRET=''
HUBSPOT_API_KEY=''
AI_EMBEDDING_MODELS='[{"model": "text-embeddings-inference:llmrails/ember-v1", "url": "http://localhost:3040/"}]'
AI_GENERATION_MODELS='[{"model": "text-generation-inference:TheBloke/zephyr-7b-beta", "url": "http://localhost:3050/"}]'
mattkrick marked this conversation as resolved.
Show resolved Hide resolved
AI_EMBEDDER_WORKERS='1'
POSTGRES_USE_PGVECTOR='true'
2 changes: 1 addition & 1 deletion docker/stacks/development/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ services:
networks:
parabol-network:
text-embeddings-inference:
image: ghcr.io/huggingface/text-embeddings-inference:cpu-0.6
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.2
command:
- "--model-id=llmrails/ember-v1"
platform: linux/x86_64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
postgres:
container_name: postgres
profiles: ["databases"]
image: postgres:15.4
image: pgvector/pgvector:0.6.2-pg15
restart: always
env_file: .env
environment:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
"html-webpack-plugin": "^5.5.0",
"husky": "^7.0.4",
"jscodeshift": "^0.14.0",
"kysely": "^0.27.2",
"kysely": "^0.27.3",
"kysely-codegen": "^0.11.0",
"lerna": "^6.4.1",
"mini-css-extract-plugin": "^2.7.2",
Expand Down
9 changes: 9 additions & 0 deletions packages/client/shared/gqlIds/EmbedderChannelId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const EmbedderChannelId = {
join: (serverId: string) => `embedder:${serverId}`,
split: (id: string) => {
const [, serverId] = id.split(':')
return serverId
}
}

export default EmbedderChannelId
3 changes: 3 additions & 0 deletions packages/client/types/generics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ export type WithFieldsAsType<TObj, NType, F> = {
: TObj[K]
}

export type Tuple<T, N, R extends T[] = []> = R['length'] extends N ? R : Tuple<T, N, [...R, T]>
export type ParseInt<T extends string> = T extends `${infer Digit extends number}` ? Digit : never

declare global {
interface Array<T> {
findLastIndex(predicate: (value: T, index: number, obj: T[]) => unknown, thisArg?: any): number
Expand Down
6 changes: 6 additions & 0 deletions packages/embedder/EMBEDDER_JOB_PRIORITY.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export const EMBEDDER_JOB_PRIORITY = {
MEETING: 40,
DEFAULT: 50,
TOPIC_HISTORY: 80,
NEW_MODEL: 90
} as const
72 changes: 72 additions & 0 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {Selectable, sql} from 'kysely'
import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import RootDataLoader from '../server/dataloader/RootDataLoader'
import {processJob} from './processJob'
import {Logger} from '../server/utils/Logger'

export type DBJob = Selectable<DB['EmbeddingsJobQueue']>
export type EmbedJob = DBJob & {
jobType: 'embed'
jobData: {
embeddingsMetadataId: number
model: string
}
}
export type RerankJob = DBJob & {jobType: 'rerank'; jobData: {discussionIds: string[]}}
export type Job = EmbedJob | RerankJob

export class EmbeddingsJobQueueStream implements AsyncIterableIterator<Job> {
[Symbol.asyncIterator]() {
return this
}
dataLoader = new RootDataLoader({maxBatchSize: 1000})
async next(): Promise<IteratorResult<Job>> {
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
.with(
(cte) => cte('ids').materialized(),
(db) =>
db
.selectFrom('EmbeddingsJobQueue')
.select('id')
.orderBy(['priority'])
.$if(!isFailed, (db) => db.where('state', '=', 'queued'))
.$if(isFailed, (db) =>
db.where('state', '=', 'failed').where('retryAfter', '<', new Date())
)
.limit(1)
.forUpdate()
.skipLocked()
)
.updateTable('EmbeddingsJobQueue')
.set({state: 'running', startAt: new Date()})
.where('id', '=', sql<number>`ANY(SELECT id FROM ids)`)
.returningAll()
.executeTakeFirst()
}
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('1m'))
return this.next()
}

const isSuccessful = await processJob(job as Job, this.dataLoader)
if (isSuccessful) {
await pg.deleteFrom('EmbeddingsJobQueue').where('id', '=', job.id).executeTakeFirstOrThrow()
}
return {done: false, value: job as Job}
}
return() {
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
return Promise.resolve({done: true, value: error})
}
}
41 changes: 17 additions & 24 deletions packages/embedder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,14 @@
This service builds embedding vectors for semantic search and for other AI/ML
use cases. It does so by:

1. Updating a list of all possible items to create embedding vectors for and
storing that list in the `EmbeddingsMetadata` table
2. Adding these items in batches to the `EmbeddingsJobQueue` table and a redis
priority queue called `embedder:queue`
3. Allowing one or more parallel embedding services to calculate embedding
vectors (EmbeddingJobQueue states transistion from `queued` -> `embedding`,
then `embedding` -> [deleting the `EmbeddingJobQueue` row]

In addition to deleteing the `EmbeddingJobQueue` row, when a job completes
successfully:

- A row is added to the model table with the embedding vector; the
`EmbeddingMetadataId` field on this row points the appropriate
metadata row on `EmbeddingsMetadata`
- The `EmbeddingsMetadata.models` array is updated with the name of the
table that the embedding has been generated for

4. This process repeats forever using a silly polling loop

In the future, it would be wonderful to enhance this service such that it were
event driven.
1. Homogenizes different types of data into a single `EmbeddingsMetadata` table
2. Each new row in `EmbeddingsMetadata` creates a new row in `EmbeddingsJobQueue` for each model
3. Uses PG to pick a job from the queue and sets the job from `queued` -> `embedding`,
then `embedding` -> [deleting the `EmbeddingJobQueue` row]
4. Embedding involves creating a `fullText` from the work item and then a vector from that `fullText`
5. New jobs to add metadata are sent via redis streams from the GQL Executor
6. If embedding fails, the application increments the `retryCount` and increases the `retryAfter` if a retry is desired
7. If a job gets stalled, a process that runs every 5 minutes will look for jobs older than 5 minutes and reset them to `queued`

## Prerequisites

Expand All @@ -37,10 +24,9 @@ The predeploy script checks for an environment variable
The Embedder service takes no arguments and is controlled by the following
environment variables, here given with example configuration:

- `AI_EMBEDDER_ENABLE`: enable/disable the embedder service from
performing work, or sleeping indefinitely
- `AI_EMBEDDER_WORKERS`: How many workers should simultaneously pick jobs from the queue. If less than 1, disabled.

`AI_EMBEDDER_ENABLED='true'`
`AI_EMBEDDER_WORKERS='1'`

- `AI_EMBEDDING_MODELS`: JSON configuration for which embedding models
are enabled. Each model in the array will be instantiated by
Expand Down Expand Up @@ -69,3 +55,10 @@ environment variables, here given with example configuration:
The Embedder service is stateless and takes no arguments. Multiple instances
of the service may be started in order to match embedding load, or to
catch up on history more quickly.

## Resources

### PG as a Job Queue

- https://leontrolski.github.io/postgres-as-queue.html
- https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/
15 changes: 15 additions & 0 deletions packages/embedder/addEmbeddingsMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRetrospectiveDiscussionTopic'
import {MessageToEmbedder} from './custom'

export const addEmbeddingsMetadata = async ({objectTypes, ...options}: MessageToEmbedder) => {
return Promise.all(
objectTypes.map((type) => {
switch (type) {
case 'retrospectiveDiscussionTopic':
return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(options)
default:
throw new Error(`Invalid object type: ${type}`)
}
})
)
}
Loading
Loading