From 5f0510120428cbaffaa4569a705a5f20644d8c83 Mon Sep 17 00:00:00 2001 From: Thomas Mauran Date: Thu, 28 Sep 2023 15:02:49 +0200 Subject: [PATCH] feat(assets): assets migrate tenant --- lib/modules/asset/AssetService.ts | 197 +++++++++++++++++++++- lib/modules/asset/AssetsController.ts | 21 +++ lib/modules/device/DeviceService.ts | 27 +++ lib/modules/device/types/DeviceEvents.ts | 38 +++++ lib/modules/shared/utils/recoveryQueue.ts | 44 +++++ 5 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 lib/modules/shared/utils/recoveryQueue.ts diff --git a/lib/modules/asset/AssetService.ts b/lib/modules/asset/AssetService.ts index 8419f5d7..d9b57375 100644 --- a/lib/modules/asset/AssetService.ts +++ b/lib/modules/asset/AssetService.ts @@ -10,7 +10,12 @@ import { } from "kuzzle-sdk"; import _ from "lodash"; -import { AskDeviceUnlinkAsset } from "../device"; +import { + AskDeviceAttachEngine, + AskDeviceDetachEngine, + AskDeviceLinkAsset, + AskDeviceUnlinkAsset, +} from "../device"; import { AskModelAssetGet, AssetModelContent } from "../model"; import { AskEngineList, @@ -39,6 +44,7 @@ import { AssetHistoryContent, AssetHistoryEventMetadata, } from "./types/AssetHistoryContent"; +import { RecoveryQueue } from "../shared/utils/recoveryQueue"; export class AssetService { private context: PluginContext; @@ -268,6 +274,195 @@ export class AssetService { return result; } + public async migrateTenant( + user: User, + assetsList: string[], + engineId: string, + newEngineId: string + ) { + return lock(`engine:${engineId}:${newEngineId}`, async () => { + const recovery = new RecoveryQueue(); + + try { + // check if tenant destination of the the same group + const engine = await this.getEngine(engineId); + const newEngine = await this.getEngine(newEngineId); + + if (engine.group !== newEngine.group) { + throw new BadRequestError( + `Tenant ${newEngineId} is not in the same group as ${engineId}` + ); + } + + const assets = await this.sdk.document.mGet( + engineId, + InternalCollection.ASSETS, + assetsList + ); + + // check if the assets exists in the other engine + const existingAssets = await this.sdk.document.mGet( + newEngineId, + InternalCollection.ASSETS, + assetsList + ); + + if (existingAssets.successes.length > 0) { + throw new BadRequestError( + `Assets ${existingAssets.successes + .map((asset) => asset._id) + .join(", ")} already exists in engine ${newEngineId}` + ); + } + const assetsToMigrate = assets.successes.map((asset) => ({ + _id: asset._id, + body: asset._source, + })); + + const devices = await this.sdk.document.search( + engineId, + InternalCollection.DEVICES, + { + query: { + bool: { + filter: { + terms: { + assetId: assetsList, + }, + }, + }, + }, + } + ); + + // Map linked devices for assets. + const assetLinkedDevices = assets.successes + .filter((asset) => asset._source.linkedDevices.length > 0) + .map((asset) => ({ + assetId: asset._id, + linkedDevices: asset._source.linkedDevices, + })); + + // Extra recovery step to relink back assets to their devices in case of rollback + recovery.addRecovery(async () => { + // Link the devices to the new assets + for (const asset of assetLinkedDevices) { + const assetId = asset.assetId; + for (const device of asset.linkedDevices) { + const deviceId = device._id; + const measureNames = device.measureNames; + await ask( + "ask:device-manager:device:link-asset", + { + assetId, + deviceId, + engineId, + measureNames: measureNames, + user, + } + ); + } + } + }); + + // detach from current tenant + for (const device of devices.hits) { + await ask( + "ask:device-manager:device:detach-engine", + { deviceId: device._id, user } + ); + await ask( + "ask:device-manager:device:attach-engine", + { deviceId: device._id, engineId: newEngineId, user } + ); + } + + // recovery function to reattach devices to the old tenant + recovery.addRecovery(async () => { + for (const device of devices.hits) { + await ask( + "ask:device-manager:device:detach-engine", + { deviceId: device._id, user } + ); + await ask( + "ask:device-manager:device:attach-engine", + { deviceId: device._id, engineId, user } + ); + } + }); + + // Create the assets in the new tenant + await this.sdk.document.mCreate( + newEngineId, + InternalCollection.ASSETS, + assetsToMigrate + ); + + recovery.addRecovery(async () => { + await this.sdk.document.mDelete( + newEngineId, + InternalCollection.ASSETS, + assetsList + ); + }); + + // Delete the assets in the old tenant + await this.sdk.document.mDelete( + engineId, + InternalCollection.ASSETS, + assetsList + ); + + recovery.addRecovery(async () => { + await this.sdk.document.mCreate( + engineId, + InternalCollection.ASSETS, + assetsToMigrate + ); + }); + + // Link the devices to the new assets + for (const asset of assetLinkedDevices) { + const assetId = asset.assetId; + for (const device of asset.linkedDevices) { + const deviceId = device._id; + const measureNames = device.measureNames; + await ask( + "ask:device-manager:device:link-asset", + { + assetId, + deviceId, + engineId: newEngineId, + measureNames: measureNames, + user, + } + ); + } + } + + recovery.addRecovery(async () => { + for (const asset of assetLinkedDevices) { + for (const device of asset.linkedDevices) { + const deviceId = device._id; + await ask( + "ask:device-manager:device:unlink-asset", + { + deviceId, + user, + } + ); + } + } + }); + } catch (error) { + await recovery.rollback(); + throw new BadRequestError( + `An error occured while migrating assets: ${error}` + ); + } + }); + } + /** * Replace an asset metadata */ diff --git a/lib/modules/asset/AssetsController.ts b/lib/modules/asset/AssetsController.ts index 3ca96008..bd7fe5df 100644 --- a/lib/modules/asset/AssetsController.ts +++ b/lib/modules/asset/AssetsController.ts @@ -96,6 +96,15 @@ export class AssetsController { }, ], }, + migrateTenant: { + handler: this.migrateTenant.bind(this), + http: [ + { + path: "device-manager/:engineId/assets/_migrateTenant", + verb: "post", + }, + ], + }, }, }; /* eslint-enable sort-keys */ @@ -341,4 +350,16 @@ export class AssetsController { return { link }; } + + async migrateTenant(request: KuzzleRequest) { + const assetsList = request.getBodyArray("assetsList"); + const engineId = request.getString("engineId"); + const newEngineId = request.getBodyString("newEngineId"); + await this.assetService.migrateTenant( + request.getUser(), + assetsList, + engineId, + newEngineId + ); + } } diff --git a/lib/modules/device/DeviceService.ts b/lib/modules/device/DeviceService.ts index 1764fc8a..03ee6c04 100644 --- a/lib/modules/device/DeviceService.ts +++ b/lib/modules/device/DeviceService.ts @@ -21,8 +21,11 @@ import { DeviceContent } from "./types/DeviceContent"; import { DeviceSerializer } from "./model/DeviceSerializer"; import { AskDeviceUnlinkAsset, + AskDeviceDetachEngine, EventDeviceUpdateAfter, EventDeviceUpdateBefore, + AskDeviceAttachEngine, + AskDeviceLinkAsset, } from "./types/DeviceEvents"; import { ApiDeviceLinkAssetRequest } from "./types/DeviceApi"; import { AskPayloadReceiveFormated } from "../decoder/types/PayloadEvents"; @@ -54,6 +57,16 @@ export class DeviceService { constructor(plugin: Plugin) { this.config = plugin.config as any; this.context = plugin.context; + this.registerAskEvents(); + } + + registerAskEvents() { + onAsk( + "ask:device-manager:device:link-asset", + async ({ deviceId, engineId, user, assetId, measureNames }) => { + await this.linkAsset(user, engineId, deviceId, assetId, measureNames); + } + ); onAsk( "ask:device-manager:device:unlink-asset", @@ -61,6 +74,20 @@ export class DeviceService { await this.unlinkAsset(user, deviceId); } ); + + onAsk( + "ask:device-manager:device:detach-engine", + async ({ deviceId, user }) => { + await this.detachEngine(user, deviceId); + } + ); + + onAsk( + "ask:device-manager:device:attach-engine", + async ({ deviceId, engineId, user }) => { + await this.attachEngine(user, engineId, deviceId); + } + ); } /** diff --git a/lib/modules/device/types/DeviceEvents.ts b/lib/modules/device/types/DeviceEvents.ts index b08c9e39..1c1749a6 100644 --- a/lib/modules/device/types/DeviceEvents.ts +++ b/lib/modules/device/types/DeviceEvents.ts @@ -4,6 +4,7 @@ import { KDocument } from "kuzzle-sdk"; import { Metadata } from "../../../modules/shared"; import { DeviceContent } from "./DeviceContent"; +import { ApiDeviceLinkAssetRequest } from "./DeviceApi"; export type EventDeviceUpdateBefore = { name: "device-manager:device:update:before"; @@ -20,6 +21,20 @@ export type EventDeviceUpdateAfter = { /** * @internal */ +export type AskDeviceLinkAsset = { + name: "ask:device-manager:device:link-asset"; + + payload: { + engineId: string; + assetId: string; + deviceId: string; + user: User; + measureNames: ApiDeviceLinkAssetRequest["body"]["measureNames"]; + }; + + result: void; +}; + export type AskDeviceUnlinkAsset = { name: "ask:device-manager:device:unlink-asset"; @@ -30,3 +45,26 @@ export type AskDeviceUnlinkAsset = { result: void; }; + +export type AskDeviceDetachEngine = { + name: "ask:device-manager:device:detach-engine"; + + payload: { + deviceId: string; + user: User; + }; + + result: void; +}; + +export type AskDeviceAttachEngine = { + name: "ask:device-manager:device:attach-engine"; + + payload: { + engineId: string; + deviceId: string; + user: User; + }; + + result: void; +}; diff --git a/lib/modules/shared/utils/recoveryQueue.ts b/lib/modules/shared/utils/recoveryQueue.ts new file mode 100644 index 00000000..ec367df9 --- /dev/null +++ b/lib/modules/shared/utils/recoveryQueue.ts @@ -0,0 +1,44 @@ +/** + * This recovery queue has been made by the paas team here https://github.com/kuzzleio/paas-console/blob/master/apps/api/lib/api/RecoveryQueue.ts + * The RecoveryQueue allows you to store action to perform later and trigger + * all of it at once. + */ +export class RecoveryQueue { + // eslint-disable-next-line @typescript-eslint/ban-types + private actions: Function[]; + + constructor() { + this.actions = []; + } + + /** + * The addRecovery function adds a recovery function to an array of actions. + * @param {any} func - The `func` parameter is a function that you want to add to the `actions` array. + */ + addRecovery(func: any) { + this.actions.push(func); + } + + /** + * The "reset" function clears the "actions" array. + */ + reset() { + this.actions = []; + } + + /** + * The `rollback` function iterates through a list of actions in reverse order and attempts to execute + * each action, logging any errors encountered. + */ + async rollback() { + for (const action of this.actions.reverse()) { + try { + await action(); + } catch (error) { + global.app.log.error(` + Rollback process failed: ${error} + `); + } + } + } +}