Skip to content

Commit

Permalink
feat: Create embeddings for meeting templates (#9776)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dschoordsch authored Jun 13, 2024
1 parent 6cf4098 commit 095cf71
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 1 deletion.
54 changes: 54 additions & 0 deletions packages/embedder/addEmbeddingsMetadataForMeetingTemplate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {ExpressionOrFactory, SqlBool, sql} from 'kysely'
import {DB} from 'parabol-server/postgres/pg'
import {Logger} from 'parabol-server/utils/Logger'
import getKysely from '../server/postgres/getKysely'
import {AddEmbeddingsMetadataParams} from './addEmbeddingsMetadata'
import {insertMeetingTemplatesIntoMetadataAndQueue} from './insertMeetingTemplatesIntoMetadataAndQueue'

export const addEmbeddingsMetadataForMeetingTemplate = async ({
startAt,
endAt
}: AddEmbeddingsMetadataParams) => {
const pg = getKysely()
// PG only accepts 65K parameters (inserted columns * number of rows + query params). Make the batches as big as possible
const PG_MAX_PARAMS = 65535
const QUERY_PARAMS = 10
const METADATA_COLS_PER_ROW = 4
const BATCH_SIZE = Math.floor((PG_MAX_PARAMS - QUERY_PARAMS) / METADATA_COLS_PER_ROW)
const pgStartAt = startAt || new Date(0)
const pgEndAt = (endAt || new Date('4000')).getTime() / 1000

let curEndAt = pgEndAt
let curEndId = ''
for (let i = 0; i < 1e6; i++) {
// preserve microsecond resolution to keep timestamps equal
// so we can use the ID as a tiebreaker when count(createdAt) > BATCH_SIZE
const pgTime = sql<Date>`to_timestamp(${curEndAt})`
const lessThanTimeOrId: ExpressionOrFactory<DB, 'MeetingTemplate', SqlBool> = curEndId
? ({eb}) =>
eb('updatedAt', '<', pgTime).or(eb('updatedAt', '=', pgTime).and('id', '>', curEndId))
: ({eb}) => eb('updatedAt', '<=', pgTime)
const templates = await pg
.selectFrom('MeetingTemplate')
.select([
'id',
'teamId',
'updatedAt',
sql<number>`extract(epoch from "updatedAt")`.as('updatedAtEpoch')
])
.where('updatedAt', '>', pgStartAt)
.where(lessThanTimeOrId)
.orderBy('updatedAt', 'desc')
.orderBy('id')
.limit(BATCH_SIZE)
.execute()
const earliestInBatch = templates.at(-1)
if (!earliestInBatch) break
const {updatedAtEpoch, id} = earliestInBatch
curEndId = curEndAt === updatedAtEpoch ? id : ''
curEndAt = updatedAtEpoch
await insertMeetingTemplatesIntoMetadataAndQueue(templates, 5)
const jsTime = new Date(updatedAtEpoch * 1000)
Logger.log(`Inserted ${templates.length} meetingtemplates in metadata ending at ${jsTime}`)
}
}
36 changes: 36 additions & 0 deletions packages/embedder/importHistoricalMeetingTemplates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from 'parabol-server/utils/Logger'
import {addEmbeddingsMetadataForMeetingTemplate} from './addEmbeddingsMetadataForMeetingTemplate'

// Check to see if the oldest discussion topic exists in the metadata table
// If not, get the date of the oldest discussion topic in the metadata table and import all items before that date
export const importHistoricalMeetingTemplates = async () => {
const pg = getKysely()
const isEarliestMetadataImported = await pg
.selectFrom('EmbeddingsMetadata')
.select('id')
.where(({eb, selectFrom}) =>
eb(
'EmbeddingsMetadata.refId',
'=',
selectFrom('MeetingTemplate')
.select('MeetingTemplate.id')
.orderBy(['updatedAt', 'id'])
.limit(1)
)
)
.limit(1)
.executeTakeFirst()

if (isEarliestMetadataImported) return
const earliestImportedTemplate = await pg
.selectFrom('EmbeddingsMetadata')
.select(['id', 'refUpdatedAt', 'refId'])
.where('objectType', '=', 'meetingTemplate')
.orderBy('refUpdatedAt')
.limit(1)
.executeTakeFirst()
const endAt = earliestImportedTemplate?.refUpdatedAt ?? undefined
Logger.log(`Importing meeting template history up to ${endAt || 'now'}`)
return addEmbeddingsMetadataForMeetingTemplate({endAt})
}
5 changes: 4 additions & 1 deletion packages/embedder/importHistoricalMetadata.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import {EmbeddingObjectType} from './custom'
import {importHistoricalMeetingTemplates} from './importHistoricalMeetingTemplates'
import {importHistoricalRetrospectiveDiscussionTopic} from './importHistoricalRetrospectiveDiscussionTopic'

export const importHistoricalMetadata = async () => {
const OBJECT_TYPES: EmbeddingObjectType[] = ['retrospectiveDiscussionTopic']
const OBJECT_TYPES: EmbeddingObjectType[] = ['retrospectiveDiscussionTopic', 'meetingTemplate']
return Promise.all(
OBJECT_TYPES.map(async (objectType) => {
switch (objectType) {
case 'retrospectiveDiscussionTopic':
return importHistoricalRetrospectiveDiscussionTopic()
case 'meetingTemplate':
return importHistoricalMeetingTemplates()
default:
throw new Error(`Invalid object type: ${objectType}`)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/embedder/indexing/createEmbeddingTextFrom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Selectable} from 'kysely'
import {DB} from 'parabol-server/postgres/pg'

import {DataLoaderInstance} from '../../server/dataloader/RootDataLoader'
import {createTextFromMeetingTemplate} from './meetingTemplate'
import {createTextFromRetrospectiveDiscussionTopic} from './retrospectiveDiscussionTopic'

export const createEmbeddingTextFrom = async (
Expand All @@ -16,6 +17,8 @@ export const createEmbeddingTextFrom = async (
dataLoader,
isRerank
)
case 'meetingTemplate':
return createTextFromMeetingTemplate(embeddingsMetadata.refId, dataLoader)
default:
throw new Error(`Unexcepted objectType: ${embeddingsMetadata.objectType}`)
}
Expand Down
67 changes: 67 additions & 0 deletions packages/embedder/indexing/meetingTemplate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import {DataLoaderInstance} from 'parabol-server/dataloader/RootDataLoader'
import MeetingTemplate from '../../server/database/types/MeetingTemplate'
import PokerTemplate from '../../server/database/types/PokerTemplate'
import ReflectTemplate from '../../server/database/types/ReflectTemplate'
import {inferLanguage} from '../inferLanguage'

const createTextFromRetrospectiveMeetingTemplate = async (
template: ReflectTemplate,
dataLoader: DataLoaderInstance
) => {
const prompts = await dataLoader.get('reflectPromptsByTemplateId').load(template.id)
const promptText = prompts
.map(({question, description}) => {
return `${question}\n${description}`
})
.join('\n')
return `${template.name}\nRetrospective\n${promptText}`
}

const createTextFromTeamPromptMeetingTemplate = async (template: MeetingTemplate) => {
return `${template.name}\nteam prompt, daily standup, status update`
}

const createTextFromActionMeetingTemplate = async (template: MeetingTemplate) => {
return `${template.name}\ncheck-in, action, task, todo, follow-up`
}

const createTextFromPokerMeetingTemplate = async (
template: PokerTemplate,
dataLoader: DataLoaderInstance
) => {
const dimensions = await dataLoader.get('templateDimensionsByTemplateId').load(template.id)
const dimensionsText = (
await Promise.all(
dimensions.map(async ({name, description, scaleId}) => {
const scale = await dataLoader.get('templateScales').load(scaleId)
const scaleValues = scale.values.map(({label}) => label).join(', ')
return `${name}\n${description}\n${scale.name}\n${scaleValues}`
})
)
).join('\n')
return `${template.name}\nplanning poker, sprint poker, estimation\n${dimensionsText}`
}

export const createTextFromMeetingTemplate = async (
templateId: string,
dataLoader: DataLoaderInstance
) => {
const template = await dataLoader.get('meetingTemplates').load(templateId)
const body = await (() => {
switch (template?.type) {
case 'retrospective':
return createTextFromRetrospectiveMeetingTemplate(template, dataLoader)
case 'teamPrompt':
return createTextFromTeamPromptMeetingTemplate(template)
case 'action':
return createTextFromActionMeetingTemplate(template)
case 'poker':
return createTextFromPokerMeetingTemplate(template, dataLoader)
default:
return ''
}
})()

const language = inferLanguage(body)
return {body, language}
}
57 changes: 57 additions & 0 deletions packages/embedder/insertMeetingTemplatesIntoMetadataAndQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import {sql} from 'kysely'
import getKysely from 'parabol-server/postgres/getKysely'
import getModelManager from './ai_models/ModelManager'
import {getEmbedderPriority} from './getEmbedderPriority'

export interface MeetingTemplateMeta {
id: string
teamId: string
updatedAt: Date
}

export const insertMeetingTemplatesIntoMetadataAndQueue = async (
meetingTemplates: MeetingTemplateMeta[],
maxDelayInDays: number
) => {
const pg = getKysely()
const metadataRows = meetingTemplates.map(({id, teamId, updatedAt}) => ({
refId: id,
objectType: 'meetingTemplate' as const,
teamId,
refUpdatedAt: updatedAt
}))
if (!metadataRows[0]) return

const modelManager = getModelManager()
const tableNames = [...modelManager.embeddingModels.keys()]
const priority = getEmbedderPriority(maxDelayInDays)
// This is ugly but it runs fast, which is what we need for historical data
return pg
.with('Insert', (qc) =>
qc
.insertInto('EmbeddingsMetadata')
.values(metadataRows)
.onConflict((oc) => oc.doNothing())
.returning('id')
)
.with('Metadata', (qc) =>
qc
.selectFrom('Insert')
.fullJoin(
sql<{model: string}>`UNNEST(ARRAY[${sql.join(tableNames)}])`.as('model'),
(join) => join.onTrue()
)
.select(['id', 'model'])
)
.insertInto('EmbeddingsJobQueue')
.columns(['jobType', 'priority', 'embeddingsMetadataId', 'model'])
.expression(({selectFrom}) =>
selectFrom('Metadata').select(({lit, ref}) => [
sql.lit('embed:start').as('jobType'),
lit(priority).as('priority'),
ref('Metadata.id').as('embeddingsMetadataId'),
ref('Metadata.model').as('model')
])
)
.execute()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
ALTER TYPE "EmbeddingsObjectTypeEnum" ADD VALUE IF NOT EXISTS 'meetingTemplate';
END $$;
`)
await client.end()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
DELETE FROM "EmbeddingsMetadata" WHERE "objectType" = 'meetingTemplate';
ALTER TYPE "EmbeddingsObjectTypeEnum" RENAME TO "EmbeddingsObjectTypeEnum_delete";
CREATE TYPE "EmbeddingsObjectTypeEnum" AS ENUM (
'retrospectiveDiscussionTopic'
);
ALTER TABLE "EmbeddingsMetadata"
ALTER COLUMN "objectType" TYPE "EmbeddingsObjectTypeEnum" USING "objectType"::text::"EmbeddingsObjectTypeEnum";
DROP TYPE "EmbeddingsObjectTypeEnum_delete";
END $$;
`)
await client.end()
}

0 comments on commit 095cf71

Please sign in to comment.