Skip to content

Commit

Permalink
Update activities in merging-entity-worker via questdb
Browse files Browse the repository at this point in the history
  • Loading branch information
Misha Savelyev committed Nov 6, 2024
1 parent ca43be7 commit fa56c49
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function moveActivitiesWithIdentityToAnotherMember(
identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value),
)) {
await moveIdentityActivitiesToNewMember(
svc.postgres.writer,
svc.questdbSQL,
tenantId,
fromId,
toId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { DbConnOrTx, DbStore } from '@crowd/database'
import { IActivityIdentity, IMemberIdentity, MergeActionState, MergeActionStep } from '@crowd/types'

import { updateActivities } from '../../../activities/update'
import { formatQuery } from '../../../queryExecutor'
import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'

import { ISegmentIds } from './types'

Expand Down Expand Up @@ -116,44 +118,35 @@ export async function getIdentitiesWithActivity(
}

export async function moveIdentityActivitiesToNewMember(
db: DbStore,
db: DbConnOrTx,
tenantId: string,
fromId: string,
toId: string,
username: string,
platform: string,
batchSize = 1000,
) {
let rowsUpdated

do {
const result = await db.connection().query(
await updateActivities(
db,
async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }),
formatQuery(
`
UPDATE activities
SET "memberId" = $(toId)
WHERE id in (
select id from activities
where "memberId" = $(fromId)
and "tenantId" = $(tenantId)
and "username" = $(username)
and "platform" = $(platform)
and "deletedAt" is null
limit $(batchSize)
)
returning id
`,
"memberId" = $(fromId)
and "tenantId" = $(tenantId)
and "username" = $(username)
and "platform" = $(platform)
and "deletedAt" is null
`,
{
toId,
fromId,
tenantId,
username,
platform,
batchSize,
},
)

rowsUpdated = result.length
} while (rowsUpdated === batchSize)
),
{
memberId: fromId,
},
)
}

export async function findMemberSegments(db: DbStore, memberId: string): Promise<ISegmentIds> {
Expand Down

0 comments on commit fa56c49

Please sign in to comment.