diff --git a/packages/server/__tests__/globalTeardown.ts b/packages/server/__tests__/globalTeardown.ts index b5a3cd1633f..03744b6ffa4 100644 --- a/packages/server/__tests__/globalTeardown.ts +++ b/packages/server/__tests__/globalTeardown.ts @@ -6,6 +6,7 @@ async function teardown() { const r = await getRethink() await r.getPoolMaster()?.drain() await getKysely().destroy() + console.log('global teardown destroy') await getRedis().quit() } diff --git a/packages/server/dataloader/__tests__/isOrgVerified.test.ts b/packages/server/dataloader/__tests__/isOrgVerified.test.ts index ed37f906642..2c6957b83f4 100644 --- a/packages/server/dataloader/__tests__/isOrgVerified.test.ts +++ b/packages/server/dataloader/__tests__/isOrgVerified.test.ts @@ -68,6 +68,7 @@ afterEach(async () => { afterAll(async () => { await getKysely().destroy() + console.log('org verified destroy') }) test('Founder is billing lead', async () => { diff --git a/packages/server/dataloader/__tests__/usersCustomRedisQueries.test.ts b/packages/server/dataloader/__tests__/usersCustomRedisQueries.test.ts index 50fcc7fa77f..3d1dbb8c44e 100644 --- a/packages/server/dataloader/__tests__/usersCustomRedisQueries.test.ts +++ b/packages/server/dataloader/__tests__/usersCustomRedisQueries.test.ts @@ -5,10 +5,7 @@ import isValid from '../../graphql/isValid' import getPg from '../../postgres/getPg' afterAll(async () => { - const pg = getPg() const dataloader = getDataLoader() - - await pg.end() dataloader.dispose(true) // TODO shutdown redis to properly end test }) diff --git a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts index 47c8215ed43..927cf3685fe 100644 --- a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts +++ b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts @@ -2,7 +2,13 @@ import getRethink from '../../../database/rethinkDriver' import getFileStoreManager from '../../../fileStorage/getFileStoreManager' import getKysely from '../../../postgres/getKysely' import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase' -import {compareRValUndefinedAsNull, defaultEqFn} from '../../../postgres/utils/rethinkEqualityFns' +import { + compareDateAlmostEqual, + compareRValUndefinedAsFalse, + compareRValUndefinedAsNull, + compareRValUndefinedAsNullAndTruncateRVal, + defaultEqFn +} from '../../../postgres/utils/rethinkEqualityFns' import {MutationResolvers} from '../resolverTypes' const handleResult = async ( @@ -27,35 +33,37 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn ) => { const r = await getRethink() - if (tableName === 'OrganizationUser') { + if (tableName === 'TeamMember') { const rowCountResult = await checkRowCount(tableName) const rethinkQuery = (joinedAt: Date, id: string | number) => { return r - .table('OrganizationUser' as any) + .table('TeamMember' as any) .between([joinedAt, id], [r.maxval, r.maxval], { - index: 'joinedAtId', + index: 'updatedAtId', leftBound: 'open', rightBound: 'closed' }) - .orderBy({index: 'joinedAtId'}) as any + .orderBy({index: 'updatedAtId'}) as any } const pgQuery = async (ids: string[]) => { - return getKysely().selectFrom('OrganizationUser').selectAll().where('id', 'in', ids).execute() + return getKysely().selectFrom('TeamMember').selectAll().where('id', 'in', ids).execute() } const errors = await checkTableEq( rethinkQuery, pgQuery, { id: defaultEqFn, - suggestedTier: compareRValUndefinedAsNull, - inactive: defaultEqFn, - joinedAt: defaultEqFn, - orgId: defaultEqFn, - removedAt: defaultEqFn, - role: compareRValUndefinedAsNull, + isNotRemoved: compareRValUndefinedAsFalse, + isLead: compareRValUndefinedAsFalse, + isSpectatingPoker: compareRValUndefinedAsFalse, + email: defaultEqFn, + openDrawer: compareRValUndefinedAsNull, + picture: defaultEqFn, + preferredName: compareRValUndefinedAsNullAndTruncateRVal(100), + teamId: defaultEqFn, userId: defaultEqFn, - tier: defaultEqFn, - trialStartDate: compareRValUndefinedAsNull + createdAt: compareDateAlmostEqual, + updatedAt: compareDateAlmostEqual }, maxErrors ) diff --git a/packages/server/postgres/migrations/1721356124871_TeamMember-phase2.ts b/packages/server/postgres/migrations/1721356124871_TeamMember-phase2.ts new file mode 100644 index 00000000000..f88c3c6a97d --- /dev/null +++ b/packages/server/postgres/migrations/1721356124871_TeamMember-phase2.ts @@ -0,0 +1,144 @@ +import {Kysely, PostgresDialect} from 'kysely' +import {r} from 'rethinkdb-ts' +import connectRethinkDB from '../../database/connectRethinkDB' +import getPg from '../getPg' + +export async function up() { + await connectRethinkDB() + const pg = new Kysely({ + dialect: new PostgresDialect({ + pool: getPg() + }) + }) + + // add a dummy date for nulls + const parabolEpoch = new Date('2016-06-01') + await r + .table('TeamMember') + .update((row) => ({ + updatedAt: row('updatedAt').default(parabolEpoch), + createdAt: row('createdAt').default(parabolEpoch) + })) + .run() + const strDates = await r + .table('TeamMember') + .filter((row) => row('updatedAt').typeOf().eq('STRING')) + .pluck('updatedAt', 'id', 'createdAt') + .run() + const dateDates = strDates.map((d) => ({ + id: d.id, + updatedAt: new Date(d.updatedAt), + createdAt: new Date(d.createdAt) + })) + // some dates are + await r(dateDates) + .forEach((row: any) => { + return r + .table('TeamMember') + .get(row('id')) + .update({updatedAt: row('updatedAt')}) + }) + .run() + + try { + console.log('Adding index') + await r + .table('TeamMember') + .indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')]) + .run() + await r.table('TeamMember').indexWait().run() + } catch { + // index already exists + } + + await console.log('Adding index complete') + const MAX_PG_PARAMS = 65545 + const PG_COLS = [ + 'id', + 'isNotRemoved', + 'isLead', + 'isSpectatingPoker', + 'email', + 'openDrawer', + 'picture', + 'preferredName', + 'teamId', + 'userId', + 'createdAt', + 'updatedAt' + ] as const + type TeamMember = { + [K in (typeof PG_COLS)[number]]: any + } + const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length) + + let curUpdatedAt = r.minval + let curId = r.minval + for (let i = 0; i < 1e6; i++) { + console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId)) + const rawRowsToInsert = (await r + .table('TeamMember') + .between([curUpdatedAt, curId], [r.maxval, r.maxval], { + index: 'updatedAtId', + leftBound: 'open', + rightBound: 'closed' + }) + .orderBy({index: 'updatedAtId'}) + .limit(BATCH_SIZE) + .pluck(...PG_COLS) + .run()) as TeamMember[] + + const rowsToInsert = rawRowsToInsert.map((row) => { + const {preferredName, picture, ...rest} = row as any + return { + ...rest, + preferredName: preferredName.slice(0, 100), + picture: picture.slice(0, 2056) + } + }) + if (rowsToInsert.length === 0) break + const lastRow = rowsToInsert[rowsToInsert.length - 1] + curUpdatedAt = lastRow.updatedAt + curId = lastRow.id + try { + await pg + .insertInto('TeamMember') + .values(rowsToInsert) + .onConflict((oc) => oc.doNothing()) + .execute() + } catch (e) { + await Promise.all( + rowsToInsert.map(async (row) => { + try { + await pg + .insertInto('TeamMember') + .values(row) + .onConflict((oc) => oc.doNothing()) + .execute() + } catch (e) { + if (e.constraint === 'fk_userId' || e.constraint === 'fk_teamId') { + console.log(`Skipping ${row.id} because it has no user/team`) + return + } + console.log(e, row) + } + }) + ) + } + } +} + +export async function down() { + await connectRethinkDB() + try { + await r.table('TeamMember').indexDrop('updatedAtId').run() + } catch { + // index already dropped + } + const pg = new Kysely({ + dialect: new PostgresDialect({ + pool: getPg() + }) + }) + await pg.deleteFrom('TeamMember').execute() +} diff --git a/packages/server/postgres/utils/checkEqBase.ts b/packages/server/postgres/utils/checkEqBase.ts index bbe31117b3f..175d20a57a5 100644 --- a/packages/server/postgres/utils/checkEqBase.ts +++ b/packages/server/postgres/utils/checkEqBase.ts @@ -33,7 +33,7 @@ export const checkRowCount = async (tableName: string) => { } export async function checkTableEq( - rethinkQuery: (joinedAt: Date, id: string | number) => RSelection, + rethinkQuery: (updatedAt: Date, id: string | number) => RSelection, pgQuery: (ids: string[]) => Promise, equalityMap: Record boolean>, maxErrors: number | null | undefined @@ -51,7 +51,7 @@ export async function checkTableEq( .run()) as RethinkDoc[] if (rethinkRows.length === 0) break const lastRow = rethinkRows[rethinkRows.length - 1]! - curUpdatedDate = lastRow.joinedAt + curUpdatedDate = lastRow.updatedAt curId = lastRow.id const ids = rethinkRows.map((t) => t.id) const pgRows = (await pgQuery(ids)) ?? [] diff --git a/packages/server/postgres/utils/rethinkEqualityFns.ts b/packages/server/postgres/utils/rethinkEqualityFns.ts index 3d63e8381df..201afa28e9c 100644 --- a/packages/server/postgres/utils/rethinkEqualityFns.ts +++ b/packages/server/postgres/utils/rethinkEqualityFns.ts @@ -22,6 +22,11 @@ export const compareRealNumber = (rVal: unknown, pgVal: unknown) => { return answer } +export const compareRValUndefinedAs = + (as: string | number | boolean | null | undefined) => (rVal: unknown, pgVal: unknown) => { + const normalizedRVal = rVal === undefined ? as : rVal + return defaultEqFn(normalizedRVal, pgVal) + } export const compareRValUndefinedAsNull = (rVal: unknown, pgVal: unknown) => { const normalizedRVal = rVal === undefined ? null : rVal return defaultEqFn(normalizedRVal, pgVal) diff --git a/packages/server/utils/__tests__/isRequestToJoinDomainAllowed.test.ts b/packages/server/utils/__tests__/isRequestToJoinDomainAllowed.test.ts index 52b67b82600..dbe8589d6d8 100644 --- a/packages/server/utils/__tests__/isRequestToJoinDomainAllowed.test.ts +++ b/packages/server/utils/__tests__/isRequestToJoinDomainAllowed.test.ts @@ -72,6 +72,7 @@ afterEach(async () => { afterAll(async () => { await getKysely().destroy() getRedis().quit() + console.log('request to join destroy') }) test('Only the biggest org with verified emails qualify', async () => {