diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 85eba09f..60168f63 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -570,10 +570,15 @@ export class DB { distinctId: string ): Promise { const insertResult = await client.query( - 'INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id) VALUES ($1, $2, $3) RETURNING *', + 'INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING RETURNING *', [distinctId, person.id, person.team_id] ) + // some other thread already added this ID + if (insertResult.rows.length === 0) { + return + } + const personDistinctIdCreated = insertResult.rows[0] as PersonDistinctId if (this.kafkaProducer) { return { diff --git a/src/worker/ingestion/process-event.ts b/src/worker/ingestion/process-event.ts index 5ed715b7..e785ba18 100644 --- a/src/worker/ingestion/process-event.ts +++ b/src/worker/ingestion/process-event.ts @@ -291,7 +291,8 @@ export class EventsProcessor { try { await this.db.addDistinctId(oldPerson, distinctId) // Catch race case when somebody already added this distinct_id between .get and .addDistinctId - } catch { + } catch (error) { + Sentry.captureException(error) // integrity error if (retryIfFailed) { // run everything again to merge the users if needed @@ -305,7 +306,8 @@ export class EventsProcessor { try { await this.db.addDistinctId(newPerson, previousDistinctId) // Catch race case when somebody already added this distinct_id between .get and .addDistinctId - } catch { + } catch (error) { + Sentry.captureException(error) // integrity error if (retryIfFailed) { // run everything again to merge the users if needed @@ -321,7 +323,8 @@ export class EventsProcessor { distinctId, previousDistinctId, ]) - } catch { + } catch (error) { + Sentry.captureException(error) // Catch race condition where in between getting and creating, // another request already created this person if (retryIfFailed) {