diff --git a/lib/modules/asset/AssetService.ts b/lib/modules/asset/AssetService.ts index 08751130..6acbbc71 100644 --- a/lib/modules/asset/AssetService.ts +++ b/lib/modules/asset/AssetService.ts @@ -15,6 +15,7 @@ import { AskDeviceDetachEngine, AskDeviceLinkAsset, AskDeviceUnlinkAsset, + DeviceContent, } from "../device"; import { AskModelAssetGet, AssetModelContent } from "../model"; import { @@ -44,7 +45,6 @@ import { AssetHistoryContent, AssetHistoryEventMetadata, } from "./types/AssetHistoryContent"; -import { RecoveryQueue } from "../shared/utils/recoveryQueue"; export class AssetService { private context: PluginContext; @@ -280,8 +280,9 @@ export class AssetService { engineId: string, newEngineId: string ): Promise { - return lock(`engine:${engineId}:${newEngineId}`, async () => { - const recovery = new RecoveryQueue(); + let migrated = 0; + + await lock(`engine:${engineId}:${newEngineId}`, async () => { if (!user.profileIds.includes("admin")) { throw new BadRequestError( @@ -304,213 +305,93 @@ export class AssetService { throw new BadRequestError("No assets to migrate"); } + //Get all assets to migrate const assets = await this.sdk.document.mGet( engineId, InternalCollection.ASSETS, assetsList ); - // check if the assets exists in the other engine - const existingAssets = await this.sdk.document.mGet( - newEngineId, - InternalCollection.ASSETS, - assetsList - ); - - if (existingAssets.successes.length > 0) { - throw new BadRequestError( - `Assets ${existingAssets.successes - .map((asset) => asset._id) - .join(", ")} already exists in engine ${newEngineId}` - ); - } - const assetsToMigrate = assets.successes.map((asset) => ({ - _id: asset._id, - body: asset._source, - })); + //Iterate over all asset, and migrate each one + for (const asset of assets.successes) { - const devices = await this.sdk.document.search( - engineId, - InternalCollection.DEVICES, - { - query: { - bool: { - filter: { - terms: { - assetId: assetsList, - }, - }, - }, - }, + //Check if an existing asset reference already exists in the new tenant + if (await this.sdk.document.exists(newEngineId, InternalCollection.ASSETS, asset._id)) { + continue; } - ); - // Map linked devices for assets. - const assetLinkedDevices = assets.successes - .filter((asset) => asset._source.linkedDevices.length > 0) - .map((asset) => ({ - assetId: asset._id, - linkedDevices: asset._source.linkedDevices, + // Create the assets in the new tenant, with empty linkedDevices and groups + let assetContent = Object.assign({}, asset._source); + assetContent.linkedDevices = []; + assetContent.groups = []; + await this.sdk.document.create( + newEngineId, + InternalCollection.ASSETS, + assetContent, + asset._id, + ); + + // get linked devices to this asset, if any + const linkedDevices = asset._source.linkedDevices + .map((d) => ({ + id: d._id, + measureNames: d.measureNames, })); - // Extra recovery step to relink back assets to their devices in case of rollback - recovery.addRecovery(async () => { - // Link the devices to the new assets - const promises: Promise[] = []; - - for (const asset of assetLinkedDevices) { - const assetId = asset.assetId; - for (const device of asset.linkedDevices) { - const deviceId = device._id; - const measureNames = device.measureNames; - promises.push( - ask( - "ask:device-manager:device:link-asset", - { - assetId, - deviceId, - engineId, - measureNames: measureNames, - user, - } - ) - ); - } - } - await Promise.all(promises); - }); + // ... ant iterate over this list + for (const device of linkedDevices) { - // detach from current tenant - await Promise.all( - devices.hits.map((device) => { - return ask( + // detach linked devices from current tenant (it also unkinks asset) + await ask( "ask:device-manager:device:detach-engine", - { deviceId: device._id, user } + { deviceId: device.id, user } ); - }) - ); - // Attach to new tenant - await Promise.all( - devices.hits.map((device) => { - return ask( + // ... and attach to new tenant + await ask( "ask:device-manager:device:attach-engine", - { deviceId: device._id, engineId: newEngineId, user } + { deviceId: device.id, engineId: newEngineId, user } ); - }) - ); - - // recovery function to reattach devices to the old tenant - recovery.addRecovery(async () => { - await Promise.all( - devices.hits.map((device) => { - return ask( - "ask:device-manager:device:detach-engine", - { deviceId: device._id, user } - ); - }) - ); - - await Promise.all( - devices.hits.map((device) => { - return ask( - "ask:device-manager:device:attach-engine", - { deviceId: device._id, engineId, user } - ); - }) - ); - }); - - // Create the assets in the new tenant - await this.sdk.document.mCreate( - newEngineId, - InternalCollection.ASSETS, - assetsToMigrate - ); - - recovery.addRecovery(async () => { - await this.sdk.document.mDelete( - newEngineId, - InternalCollection.ASSETS, - assetsList - ); - }); - - // Delete the assets in the old tenant - await this.sdk.document.mDelete( - engineId, - InternalCollection.ASSETS, - assetsList - ); - - recovery.addRecovery(async () => { - await this.sdk.document.mCreate( - engineId, - InternalCollection.ASSETS, - assetsToMigrate - ); - }); - - // Link the devices to the new assets - const promises: Promise[] = []; - - for (const asset of assetLinkedDevices) { - const assetId = asset.assetId; - for (const device of asset.linkedDevices) { - const deviceId = device._id; - const measureNames = device.measureNames; - promises.push( - ask("ask:device-manager:device:link-asset", { - assetId, - deviceId, + + // ... and link this device to the asset in the new tenant + await ask( + "ask:device-manager:device:link-asset", + { engineId: newEngineId, - measureNames: measureNames, + assetId: asset._id, + deviceId: device.id, + measureNames: device.measureNames, user, - }) + } ); } - } - await Promise.all(promises); - - recovery.addRecovery(async () => { - const promiseRecoveries: Promise[] = []; - - for (const asset of assetLinkedDevices) { - for (const device of asset.linkedDevices) { - const deviceId = device._id; - promiseRecoveries.push( - ask( - "ask:device-manager:device:unlink-asset", - { - deviceId, - user, - } - ) - ); - } - } - - await Promise.all(promiseRecoveries); - }); + // Finally here, we can delete the asset in the source engine ! + await this.sdk.document.delete( + engineId, + InternalCollection.ASSETS, + asset._id, + ); - // clear the groups - await this.sdk.document.mUpdate( - newEngineId, - InternalCollection.ASSETS, - assetsList.map((assetId) => ({ - _id: assetId, - body: { - groups: [], - }, - })) - ); + migrated ++; + } } catch (error) { - await recovery.rollback(); throw new BadRequestError( `An error occured while migrating assets: ${error}` ); } + + if(migrated == 0) { + throw new BadRequestError( + `Error occured while migrating all the assets !` + ); + } else { + await this.sdk.collection.refresh(engineId, InternalCollection.ASSETS); + await this.sdk.collection.refresh(engineId, InternalCollection.DEVICES); + await this.sdk.collection.refresh(newEngineId, InternalCollection.ASSETS); + await this.sdk.collection.refresh(newEngineId, InternalCollection.DEVICES); + await this.sdk.collection.refresh(this.config.adminIndex, InternalCollection.DEVICES); + } }); }