Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UBERF-7854: Fix live query $lookup update #6304

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions packages/query/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1111,15 +1111,21 @@ export class LiveQuery implements WithTx, Client {
for (const resDoc of docs) {
const obj = getObjectValue(objWay, resDoc)
if (obj === undefined) continue
const value = getObjectValue('$lookup.' + key, obj)
let value = getObjectValue('$lookup.' + key, obj)
const reverseCheck = reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id
if (value == null && reverseCheck) {
value = []
obj.$lookup[key] = value
}
if (Array.isArray(value)) {
if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc)) {
if (reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id) {
if ((value as Doc[]).find((p) => p._id === doc._id) === undefined) {
value.push(doc)
needCallback = true
}
if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc) && reverseCheck) {
const idx = (value as Doc[]).findIndex((p) => p._id === doc._id)
if (idx === -1) {
value.push(doc)
} else {
value[idx] = doc
}
needCallback = true
}
} else {
if (obj[key] === doc._id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
labelIntl: getEmbeddedLabel('Github Repositories')
}
]
let selectedTab: string = tabs[0].id
let selectedTab = tabs[0].id

$: loading = $ticker - (auth?.authRequestTime ?? 0) < 5000
</script>
Expand Down
78 changes: 10 additions & 68 deletions services/github/pod-github/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import core, {
Ref,
TxOperations
} from '@hcengineering/core'
import github, { GithubAuthentication, GithubIntegration, makeQuery } from '@hcengineering/github'
import github, { GithubAuthentication, makeQuery } from '@hcengineering/github'
import { MongoClientReference, getMongoClient } from '@hcengineering/mongo'
import { setMetadata } from '@hcengineering/platform'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
Expand Down Expand Up @@ -104,43 +104,18 @@ export class PlatformWorker {
this.integrations = await this.integrationCollection.find({}).toArray()
await this.queryInstallations(ctx)

const workspacesToCheck = new Set<string>()
// We need to delete local integrations not retrieved by queryInstallations()
for (const intValue of this.integrations) {
workspacesToCheck.add(intValue.workspace)
}
for (const integr of [...this.integrations]) {
// We need to check and remove integrations without a real integration's
if (!this.installations.has(integr.installationId)) {
ctx.warn('Installation was deleted during service shutdown', {
installationId: integr.installationId,
workspace: integr.workspace
})
await this.integrationCollection.deleteOne({ installationId: integr.installationId })
this.integrations = this.integrations.filter((it) => it.installationId !== integr.installationId)
}
}

const checkClean = async (): Promise<void> => {
const rateLimit = new RateLimiter(10)
for (const workspace of workspacesToCheck) {
// We need to connect to workspace and verify all installations and clean if required
try {
await rateLimit.add(async () => {
ctx.info('check clean', { workspace })
try {
await this.cleanWorkspaceInstallations(ctx, workspace)
} catch (err: any) {
ctx.error('failed to check clean', { workspace })
}
})
} catch (err: any) {
ctx.error('failed to clean workspace', { err, workspace })
}
}
await rateLimit.waitProcessing()
}
void checkClean()

void this.doSyncWorkspaces().catch((err) => {
ctx.error('error during sync workspaces', { err })
process.exit(1)
Expand Down Expand Up @@ -181,6 +156,7 @@ export class PlatformWorker {
}
await new Promise<void>((resolve) => {
this.triggerCheckWorkspaces = resolve
this.ctx.info('Workspaces check triggered')
if (errors) {
setTimeout(resolve, 5000)
}
Expand Down Expand Up @@ -217,44 +193,6 @@ export class PlatformWorker {
return (await this.usersCollection.find<GithubUserRecord>({ _id: login }).toArray()).shift()
}

async cleanWorkspaceInstallations (ctx: MeasureContext, workspace: string, installId?: number): Promise<void> {
// TODO: Do not remove record from $github if we failed to clean github installations inside workspace.
const token = generateToken(
config.SystemEmail,
{
name: workspace,
productId: config.ProductID
},
{ mode: 'github' }
)
let workspaceInfo: ClientWorkspaceInfo
try {
workspaceInfo = await getWorkspaceInfo(token)
} catch (err: any) {
ctx.error('Workspace not found:', { workspace })
return
}
if (workspaceInfo === undefined) {
ctx.error('No workspace found', { workspace })
return
}
let client: Client | undefined
try {
client = await createPlatformClient(workspace, config.ProductID, 10000)
const ops = new TxOperations(client, core.account.System)

const wsIntegerations = await client.findAll(github.class.GithubIntegration, {})

for (const intValue of wsIntegerations) {
if (!this.installations.has(intValue.installationId) || intValue.installationId === installId) {
await ops.remove<GithubIntegration>(intValue)
}
}
} finally {
await client?.close()
}
}

async mapInstallation (
ctx: MeasureContext,
workspace: string,
Expand Down Expand Up @@ -297,8 +235,6 @@ export class PlatformWorker {
installation_id: installationId
})
}
// Clean workspace
await this.cleanWorkspaceInstallations(ctx, workspace, installationId)
this.triggerCheckWorkspaces()
}

Expand Down Expand Up @@ -679,6 +615,7 @@ export class PlatformWorker {
index: widx,
total: workspaces.length
})

const worker = await GithubWorker.create(
this,
workerCtx,
Expand Down Expand Up @@ -708,6 +645,12 @@ export class PlatformWorker {
// No if no integration, we will try connect one more time in a time period
this.clients.set(workspace, worker)
} else {
workerCtx.info('Failed Register worker, timeout or integrations removed', {
workspaceId: workspaceInfo.workspaceId,
workspace: workspaceInfo.workspace,
index: widx,
total: workspaces.length
})
errors++
}
} catch (e: any) {
Expand All @@ -729,7 +672,6 @@ export class PlatformWorker {
const ws = this.clients.get(deleted)
if (ws !== undefined) {
try {
await ws.ctx.logger.close()
this.ctx.info('workspace removed from tracking list', { workspace: deleted })
this.clients.delete(deleted)
await ws.close()
Expand Down
13 changes: 13 additions & 0 deletions services/github/pod-github/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,8 @@ export class GithubWorker implements IntegrationManager {
reconnect(workspace.name, event)
})

await GithubWorker.checkIntegrations(client, installations)

const worker = new GithubWorker(
ctx,
platformWorker,
Expand All @@ -1510,6 +1512,17 @@ export class GithubWorker implements IntegrationManager {
await client?.close()
}
}

static async checkIntegrations (client: Client, installations: Map<number, InstallationRecord>): Promise<void> {
const wsIntegerations = await client.findAll(github.class.GithubIntegration, {})

for (const intValue of wsIntegerations) {
if (!installations.has(intValue.installationId)) {
const ops = new TxOperations(client, core.account.System)
await ops.remove<GithubIntegration>(intValue)
}
}
}
}

export async function syncUser (
Expand Down