Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rethinkdb): TeamMember: Phase 2 #9993

Merged
merged 17 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/server/__tests__/globalTeardown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ async function teardown() {
const r = await getRethink()
await r.getPoolMaster()?.drain()
await getKysely().destroy()
console.log('global teardown destroy')
await getRedis().quit()
}

Expand Down
1 change: 1 addition & 0 deletions packages/server/dataloader/__tests__/isOrgVerified.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ afterEach(async () => {

afterAll(async () => {
await getKysely().destroy()
console.log('org verified destroy')
})

test('Founder is billing lead', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import isValid from '../../graphql/isValid'
import getPg from '../../postgres/getPg'

afterAll(async () => {
const pg = getPg()
const dataloader = getDataLoader()

await pg.end()
dataloader.dispose(true)
// TODO shutdown redis to properly end test
})
Expand Down
36 changes: 22 additions & 14 deletions packages/server/graphql/private/mutations/checkRethinkPgEquality.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import getRethink from '../../../database/rethinkDriver'
import getFileStoreManager from '../../../fileStorage/getFileStoreManager'
import getKysely from '../../../postgres/getKysely'
import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase'
import {compareRValUndefinedAsNull, defaultEqFn} from '../../../postgres/utils/rethinkEqualityFns'
import {
compareDateAlmostEqual,
compareRValUndefinedAsFalse,
compareRValUndefinedAsNull,
compareRValUndefinedAsNullAndTruncateRVal,
defaultEqFn
} from '../../../postgres/utils/rethinkEqualityFns'
import {MutationResolvers} from '../resolverTypes'

const handleResult = async (
Expand All @@ -27,35 +33,37 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
) => {
const r = await getRethink()

if (tableName === 'OrganizationUser') {
if (tableName === 'TeamMember') {
const rowCountResult = await checkRowCount(tableName)
const rethinkQuery = (joinedAt: Date, id: string | number) => {
return r
.table('OrganizationUser' as any)
.table('TeamMember' as any)
.between([joinedAt, id], [r.maxval, r.maxval], {
index: 'joinedAtId',
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'joinedAtId'}) as any
.orderBy({index: 'updatedAtId'}) as any
}
const pgQuery = async (ids: string[]) => {
return getKysely().selectFrom('OrganizationUser').selectAll().where('id', 'in', ids).execute()
return getKysely().selectFrom('TeamMember').selectAll().where('id', 'in', ids).execute()
}
const errors = await checkTableEq(
rethinkQuery,
pgQuery,
{
id: defaultEqFn,
suggestedTier: compareRValUndefinedAsNull,
inactive: defaultEqFn,
joinedAt: defaultEqFn,
orgId: defaultEqFn,
removedAt: defaultEqFn,
role: compareRValUndefinedAsNull,
isNotRemoved: compareRValUndefinedAsFalse,
isLead: compareRValUndefinedAsFalse,
isSpectatingPoker: compareRValUndefinedAsFalse,
email: defaultEqFn,
openDrawer: compareRValUndefinedAsNull,
picture: defaultEqFn,
preferredName: compareRValUndefinedAsNullAndTruncateRVal(100),
teamId: defaultEqFn,
userId: defaultEqFn,
tier: defaultEqFn,
trialStartDate: compareRValUndefinedAsNull
createdAt: compareDateAlmostEqual,
updatedAt: compareDateAlmostEqual
},
maxErrors
)
Expand Down
144 changes: 144 additions & 0 deletions packages/server/postgres/migrations/1721356124871_TeamMember-phase2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {Kysely, PostgresDialect} from 'kysely'
import {r} from 'rethinkdb-ts'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPg from '../getPg'

export async function up() {
await connectRethinkDB()
Comment on lines +6 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for database connections.

The connection to RethinkDB should include error handling to manage potential connection issues.

- await connectRethinkDB()
+ try {
+   await connectRethinkDB()
+ } catch (error) {
+   console.error('Failed to connect to RethinkDB:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function up() {
await connectRethinkDB()
export async function up() {
try {
await connectRethinkDB()
} catch (error) {
console.error('Failed to connect to RethinkDB:', error)
throw error
}

const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
Comment on lines +8 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for PostgreSQL connection.

The connection to PostgreSQL should include error handling to manage potential connection issues.

- const pg = new Kysely<any>({
-   dialect: new PostgresDialect({
-     pool: getPg()
-   })
- })
+ let pg
+ try {
+   pg = new Kysely<any>({
+     dialect: new PostgresDialect({
+       pool: getPg()
+     })
+   })
+ } catch (error) {
+   console.error('Failed to connect to PostgreSQL:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
let pg
try {
pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
} catch (error) {
console.error('Failed to connect to PostgreSQL:', error)
throw error
}


// add a dummy date for nulls
const parabolEpoch = new Date('2016-06-01')
await r
.table('TeamMember')
.update((row) => ({
updatedAt: row('updatedAt').default(parabolEpoch),
createdAt: row('createdAt').default(parabolEpoch)
}))
.run()
Comment on lines +14 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors during date updates in RethinkDB.

Updating dates in RethinkDB should include error handling to manage potential issues.

- await r
-   .table('TeamMember')
-   .update((row) => ({
-     updatedAt: row('updatedAt').default(parabolEpoch),
-     createdAt: row('createdAt').default(parabolEpoch)
-   }))
-   .run()
+ try {
+   await r
+     .table('TeamMember')
+     .update((row) => ({
+       updatedAt: row('updatedAt').default(parabolEpoch),
+       createdAt: row('createdAt').default(parabolEpoch)
+     }))
+     .run()
+ } catch (error) {
+   console.error('Failed to update dates in RethinkDB:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// add a dummy date for nulls
const parabolEpoch = new Date('2016-06-01')
await r
.table('TeamMember')
.update((row) => ({
updatedAt: row('updatedAt').default(parabolEpoch),
createdAt: row('createdAt').default(parabolEpoch)
}))
.run()
// add a dummy date for nulls
const parabolEpoch = new Date('2016-06-01')
try {
await r
.table('TeamMember')
.update((row) => ({
updatedAt: row('updatedAt').default(parabolEpoch),
createdAt: row('createdAt').default(parabolEpoch)
}))
.run()
} catch (error) {
console.error('Failed to update dates in RethinkDB:', error)
throw error
}

const strDates = await r
.table('TeamMember')
.filter((row) => row('updatedAt').typeOf().eq('STRING'))
.pluck('updatedAt', 'id', 'createdAt')
.run()
Comment on lines +23 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors during date filtering in RethinkDB.

Filtering dates in RethinkDB should include error handling to manage potential issues.

- const strDates = await r
-   .table('TeamMember')
-   .filter((row) => row('updatedAt').typeOf().eq('STRING'))
-   .pluck('updatedAt', 'id', 'createdAt')
-   .run()
+ let strDates
+ try {
+   strDates = await r
+     .table('TeamMember')
+     .filter((row) => row('updatedAt').typeOf().eq('STRING'))
+     .pluck('updatedAt', 'id', 'createdAt')
+     .run()
+ } catch (error) {
+   console.error('Failed to filter dates in RethinkDB:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const strDates = await r
.table('TeamMember')
.filter((row) => row('updatedAt').typeOf().eq('STRING'))
.pluck('updatedAt', 'id', 'createdAt')
.run()
let strDates
try {
strDates = await r
.table('TeamMember')
.filter((row) => row('updatedAt').typeOf().eq('STRING'))
.pluck('updatedAt', 'id', 'createdAt')
.run()
} catch (error) {
console.error('Failed to filter dates in RethinkDB:', error)
throw error
}

const dateDates = strDates.map((d) => ({
id: d.id,
updatedAt: new Date(d.updatedAt),
createdAt: new Date(d.createdAt)
}))
// some dates are
await r(dateDates)
.forEach((row: any) => {
return r
.table('TeamMember')
.get(row('id'))
.update({updatedAt: row('updatedAt')})
})
.run()
Comment on lines +33 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors during date conversion in RethinkDB.

Converting dates in RethinkDB should include error handling to manage potential issues.

- await r(dateDates)
-   .forEach((row: any) => {
-     return r
-       .table('TeamMember')
-       .get(row('id'))
-       .update({updatedAt: row('updatedAt')})
-   })
-   .run()
+ try {
+   await r(dateDates)
+     .forEach((row: any) => {
+       return r
+         .table('TeamMember')
+         .get(row('id'))
+         .update({updatedAt: row('updatedAt')})
+     })
+     .run()
+ } catch (error) {
+   console.error('Failed to convert dates in RethinkDB:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// some dates are
await r(dateDates)
.forEach((row: any) => {
return r
.table('TeamMember')
.get(row('id'))
.update({updatedAt: row('updatedAt')})
})
.run()
try {
await r(dateDates)
.forEach((row: any) => {
return r
.table('TeamMember')
.get(row('id'))
.update({updatedAt: row('updatedAt')})
})
.run()
} catch (error) {
console.error('Failed to convert dates in RethinkDB:', error)
throw error
}


try {
console.log('Adding index')
await r
.table('TeamMember')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('TeamMember').indexWait().run()
} catch {
// index already exists
}
Comment on lines +43 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid catching errors just to log and rethrow them.

Let the errors propagate naturally to be handled by the calling function or global error handler.

- try {
-   console.log('Adding index')
-   await r
-     .table('TeamMember')
-     .indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
-     .run()
-   await r.table('TeamMember').indexWait().run()
- } catch {
-   // index already exists
- }
+ console.log('Adding index')
+ await r
+   .table('TeamMember')
+   .indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
+   .run()
+ await r.table('TeamMember').indexWait().run()
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
console.log('Adding index')
await r
.table('TeamMember')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('TeamMember').indexWait().run()
} catch {
// index already exists
}
console.log('Adding index')
await r
.table('TeamMember')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('TeamMember').indexWait().run()


await console.log('Adding index complete')
const MAX_PG_PARAMS = 65545
const PG_COLS = [
'id',
'isNotRemoved',
'isLead',
'isSpectatingPoker',
'email',
'openDrawer',
'picture',
'preferredName',
'teamId',
'userId',
'createdAt',
'updatedAt'
] as const
type TeamMember = {
[K in (typeof PG_COLS)[number]]: any
}
const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length)

let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId))
const rawRowsToInsert = (await r
.table('TeamMember')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as TeamMember[]

const rowsToInsert = rawRowsToInsert.map((row) => {
const {preferredName, picture, ...rest} = row as any
return {
...rest,
preferredName: preferredName.slice(0, 100),
picture: picture.slice(0, 2056)
}
})
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('TeamMember')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
await Promise.all(
rowsToInsert.map(async (row) => {
try {
await pg
.insertInto('TeamMember')
.values(row)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
if (e.constraint === 'fk_userId' || e.constraint === 'fk_teamId') {
console.log(`Skipping ${row.id} because it has no user/team`)
return
}
console.log(e, row)
}
})
)
}
}
Comment on lines +75 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimise data migration loop.

The data migration loop can be optimised by reducing the number of console.log statements and improving error handling.

- for (let i = 0; i < 1e6; i++) {
-   console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId))
+ for (let i = 0; ; i++) {
+   if (i % 100 === 0) {
+     console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId))
+   }
    const rawRowsToInsert = (await r
      .table('TeamMember')
      .between([curUpdatedAt, curId], [r.maxval, r.maxval], {
        index: 'updatedAtId',
        leftBound: 'open',
        rightBound: 'closed'
      })
      .orderBy({index: 'updatedAtId'})
      .limit(BATCH_SIZE)
      .pluck(...PG_COLS)
      .run()) as TeamMember[]

    const rowsToInsert = rawRowsToInsert.map((row) => {
      const {preferredName, picture, ...rest} = row as any
      return {
        ...rest,
        preferredName: preferredName.slice(0, 100),
        picture: picture.slice(0, 2056)
      }
    })
    if (rowsToInsert.length === 0) break
    const lastRow = rowsToInsert[rowsToInsert.length - 1]
    curUpdatedAt = lastRow.updatedAt
    curId = lastRow.id
    try {
      await pg
        .insertInto('TeamMember')
        .values(rowsToInsert)
        .onConflict((oc) => oc.doNothing())
        .execute()
    } catch (e) {
      await Promise.all(
        rowsToInsert.map(async (row) => {
          try {
            await pg
              .insertInto('TeamMember')
              .values(row)
              .onConflict((oc) => oc.doNothing())
              .execute()
          } catch (e) {
            if (e.constraint === 'fk_userId' || e.constraint === 'fk_teamId') {
              console.log(`Skipping ${row.id} because it has no user/team`)
              return
            }
            console.log(e, row)
          }
        })
      )
    }
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId))
const rawRowsToInsert = (await r
.table('TeamMember')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as TeamMember[]
const rowsToInsert = rawRowsToInsert.map((row) => {
const {preferredName, picture, ...rest} = row as any
return {
...rest,
preferredName: preferredName.slice(0, 100),
picture: picture.slice(0, 2056)
}
})
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('TeamMember')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
await Promise.all(
rowsToInsert.map(async (row) => {
try {
await pg
.insertInto('TeamMember')
.values(row)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
if (e.constraint === 'fk_userId' || e.constraint === 'fk_teamId') {
console.log(`Skipping ${row.id} because it has no user/team`)
return
}
console.log(e, row)
}
})
)
}
}
let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; ; i++) {
if (i % 100 === 0) {
console.log('inserting row', i * BATCH_SIZE, String(curUpdatedAt), String(curId))
}
const rawRowsToInsert = (await r
.table('TeamMember')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as TeamMember[]
const rowsToInsert = rawRowsToInsert.map((row) => {
const {preferredName, picture, ...rest} = row as any
return {
...rest,
preferredName: preferredName.slice(0, 100),
picture: picture.slice(0, 2056)
}
})
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('TeamMember')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
await Promise.all(
rowsToInsert.map(async (row) => {
try {
await pg
.insertInto('TeamMember')
.values(row)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
if (e.constraint === 'fk_userId' || e.constraint === 'fk_teamId') {
console.log(`Skipping ${row.id} because it has no user/team`)
return
}
console.log(e, row)
}
})
)
}
}

}

export async function down() {
await connectRethinkDB()
try {
await r.table('TeamMember').indexDrop('updatedAtId').run()
} catch {
// index already dropped
}
Comment on lines +131 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid catching errors just to log and rethrow them.

Let the errors propagate naturally to be handled by the calling function or global error handler.

- try {
-   await r.table('TeamMember').indexDrop('updatedAtId').run()
- } catch {
-   // index already dropped
- }
+ await r.table('TeamMember').indexDrop('updatedAtId').run()
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function down() {
await connectRethinkDB()
try {
await r.table('TeamMember').indexDrop('updatedAtId').run()
} catch {
// index already dropped
}
export async function down() {
await connectRethinkDB()
await r.table('TeamMember').indexDrop('updatedAtId').run()
}

const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
await pg.deleteFrom('TeamMember').execute()
Comment on lines +138 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for PostgreSQL connection.

The connection to PostgreSQL should include error handling to manage potential connection issues.

- const pg = new Kysely<any>({
-   dialect: new PostgresDialect({
-     pool: getPg()
-   })
- })
+ let pg
+ try {
+   pg = new Kysely<any>({
+     dialect: new PostgresDialect({
+       pool: getPg()
+     })
+   })
+ } catch (error) {
+   console.error('Failed to connect to PostgreSQL:', error)
+   throw error
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
await pg.deleteFrom('TeamMember').execute()
let pg
try {
pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
} catch (error) {
console.error('Failed to connect to PostgreSQL:', error)
throw error
}
await pg.deleteFrom('TeamMember').execute()

}
4 changes: 2 additions & 2 deletions packages/server/postgres/utils/checkEqBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const checkRowCount = async (tableName: string) => {
}

export async function checkTableEq(
rethinkQuery: (joinedAt: Date, id: string | number) => RSelection,
rethinkQuery: (updatedAt: Date, id: string | number) => RSelection,
pgQuery: (ids: string[]) => Promise<PGDoc[] | null>,
equalityMap: Record<string, (a: unknown, b: unknown) => boolean>,
maxErrors: number | null | undefined
Expand All @@ -51,7 +51,7 @@ export async function checkTableEq(
.run()) as RethinkDoc[]
if (rethinkRows.length === 0) break
const lastRow = rethinkRows[rethinkRows.length - 1]!
curUpdatedDate = lastRow.joinedAt
curUpdatedDate = lastRow.updatedAt
curId = lastRow.id
const ids = rethinkRows.map((t) => t.id)
const pgRows = (await pgQuery(ids)) ?? []
Expand Down
5 changes: 5 additions & 0 deletions packages/server/postgres/utils/rethinkEqualityFns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export const compareRealNumber = (rVal: unknown, pgVal: unknown) => {
return answer
}

export const compareRValUndefinedAs =
(as: string | number | boolean | null | undefined) => (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? as : rVal
return defaultEqFn(normalizedRVal, pgVal)
}
export const compareRValUndefinedAsNull = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? null : rVal
return defaultEqFn(normalizedRVal, pgVal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ afterEach(async () => {
afterAll(async () => {
await getKysely().destroy()
getRedis().quit()
console.log('request to join destroy')
})

test('Only the biggest org with verified emails qualify', async () => {
Expand Down
Loading