From fa56c49b7d2067e72d819932793e5d3bb9cce179 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Wed, 6 Nov 2024 10:41:57 +0000 Subject: [PATCH] Update activities in merging-entity-worker via questdb --- .../src/activities/members.ts | 2 +- .../old/apps/entity_merging_worker/index.ts | 43 ++++++++----------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index 24a556170f..15086731ea 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -66,7 +66,7 @@ export async function moveActivitiesWithIdentityToAnotherMember( identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value), )) { await moveIdentityActivitiesToNewMember( - svc.postgres.writer, + svc.questdbSQL, tenantId, fromId, toId, diff --git a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts index dfee5341dd..ad5fdbccaf 100644 --- a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts @@ -2,6 +2,8 @@ import { DbConnOrTx, DbStore } from '@crowd/database' import { IActivityIdentity, IMemberIdentity, MergeActionState, MergeActionStep } from '@crowd/types' import { updateActivities } from '../../../activities/update' +import { formatQuery } from '../../../queryExecutor' +import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data' import { ISegmentIds } from './types' @@ -116,44 +118,35 @@ export async function getIdentitiesWithActivity( } export async function moveIdentityActivitiesToNewMember( - db: DbStore, + db: DbConnOrTx, tenantId: string, fromId: string, toId: string, username: string, platform: string, - batchSize = 1000, ) { - let rowsUpdated - - do { - const result = await db.connection().query( + await updateActivities( + db, + async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }), + formatQuery( ` - UPDATE activities - SET "memberId" = $(toId) - WHERE id in ( - select id from activities - where "memberId" = $(fromId) - and "tenantId" = $(tenantId) - and "username" = $(username) - and "platform" = $(platform) - and "deletedAt" is null - limit $(batchSize) - ) - returning id - `, + "memberId" = $(fromId) + and "tenantId" = $(tenantId) + and "username" = $(username) + and "platform" = $(platform) + and "deletedAt" is null + `, { - toId, fromId, tenantId, username, platform, - batchSize, }, - ) - - rowsUpdated = result.length - } while (rowsUpdated === batchSize) + ), + { + memberId: fromId, + }, + ) } export async function findMemberSegments(db: DbStore, memberId: string): Promise {