Skip to content

Commit

Permalink
feat(@xen-orchestra/xapi): read vdi from nbd when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Mar 14, 2023
1 parent 9f688fa commit 7c6bb83
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 387 deletions.
17 changes: 17 additions & 0 deletions @vates/nbd-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,21 @@ module.exports = class NbdClient {
this.#readBlockResponse()
})
}

async *readBlocks(indexGenerator) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
indexGenerator = function* () {
const nbBlocks = Math.ceil(exportSize / chunkSize)
for (let index = 0; index < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
}
for (const { index, size } of indexGenerator()) {
yield this.readBlock(index, size)
}
}
}
17 changes: 16 additions & 1 deletion @vates/read-chunk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ exports.readChunk = readChunk
* @param {number} size - The number of bytes to read.
* @returns {Promise<Buffer>} - A Promise that resolves to a Buffer of size bytes. The Promise is rejected if there is an error while reading from the stream.
*/
exports.readChunkStrict = async function readChunkStrict(stream, size) {
async function readChunkStrict(stream, size) {
const chunk = await readChunk(stream, size)
if (chunk === null) {
throw new Error('stream has ended without data')
Expand All @@ -65,3 +65,18 @@ exports.readChunkStrict = async function readChunkStrict(stream, size) {

return chunk
}
exports.readChunkStrict = readChunkStrict

exports.skipChunk = async function skip(stream, skipLength, chunkLength = 1024 * 1024) {
if (skipLength <= 0) {
return
}
let remaining = skipLength
while (remaining > chunkLength) {
await readChunkStrict(stream, chunkLength)
remaining -= chunkLength
}
if (remaining > 0) {
await readChunkStrict(stream, remaining)
}
}
3 changes: 1 addition & 2 deletions @xen-orchestra/backups/_deltaVm.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ exports.exportDeltaVm = async function exportDeltaVm(
$snapshot_of$uuid: vdi.$snapshot_of?.uuid,
$SR$uuid: vdi.$SR.uuid,
}
// @todo : use (new VhdFromStream(stream)).stream() instead of vdi.$exportContent directly
// @todo : use nbdclient if possible

streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
baseRef: baseVdi?.$ref,
cancelToken,
Expand Down
32 changes: 1 addition & 31 deletions @xen-orchestra/backups/writers/DeltaBackupWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const NbdClient = require('@vates/nbd-client')

const { debug, warn, info } = createLogger('xo:backups:DeltaBackupWriter')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')

class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi) {
Expand Down Expand Up @@ -200,41 +199,12 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
await checkVhd(handler, parentPath)
}

const vdiRef = vm.$xapi.getObject(vdi.uuid).$ref

let nbdClient
if (this._backup.config.useNbd && adapter.useVhdDirectory()) {
debug('useNbd is enabled', { vdi: id, path })
// get nbd if possible
try {
// this will always take the first host in the list
const [nbdInfo] = await vm.$xapi.call('VDI.get_nbd_info', vdiRef)
debug('got NBD info', { nbdInfo, vdi: id, path })
nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()

// this will inform the xapi that we don't need this anymore
// and will detach the vdi from dom0
$defer(() => nbdClient.disconnect())

info('NBD client ready', { vdi: id, path })
Task.info('NBD used')
} catch (error) {
Task.warning('NBD configured but unusable', { error })
nbdClient = undefined
warn('error connecting to NBD server', { error, vdi: id, path })
}
} else {
debug('useNbd is disabled', { vdi: id, path })
}

transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._backup.config.writeBlockConcurrency,
nbdClient,
})

if (isDelta) {
Expand Down
1 change: 1 addition & 0 deletions @xen-orchestra/xapi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"dependencies": {
"@vates/decorate-with": "^2.0.0",
"@vates/nbd-client": "^1.0.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"d3-time-format": "^3.0.0",
Expand Down
31 changes: 29 additions & 2 deletions @xen-orchestra/xapi/vdi.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const pRetry = require('promise-toolbox/retry')
const { decorateClass } = require('@vates/decorate-with')

const extractOpaqueRef = require('./_extractOpaqueRef.js')
const NbdClient = require('@vates/nbd-client')
const { createNbdRawStream, createNbdVhdStream } = require('vhd-lib/createStreamNbd.js')

const noop = Function.prototype

Expand Down Expand Up @@ -60,19 +62,42 @@ class Vdi {
})
}

async exportContent(ref, { baseRef, cancelToken = CancelToken.none, format }) {
async exportContent(ref, { baseRef, cancelToken = CancelToken.none, format, preferNbd = true }) {
const query = {
format,
vdi: ref,
}
if (baseRef !== undefined) {
query.base = baseRef
}
let nbdClient
try {
return await this.getResource(cancelToken, '/export_raw_vdi/', {
if (preferNbd) {
const [nbdInfo] = await this.call('VDI.get_nbd_info', ref)
if (nbdInfo) {
try {
nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()
} catch (err) {
console.error(err)

nbdClient = undefined
}
}
}
// the raw nbd export does not need to peek ath the vhd source
if (nbdClient && format === 'raw') {
return createNbdRawStream(nbdClient)
}
const restourceStream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
})
if (nbdClient && format === 'vhd') {
return createNbdVhdStream(nbdClient, restourceStream)
}
// no nbd : use the direct export
return restourceStream
} catch (error) {
// augment the error with as much relevant info as possible
const [poolMaster, vdi] = await Promise.all([
Expand All @@ -82,6 +107,8 @@ class Vdi {
error.pool_master = poolMaster
error.SR = await this.getRecord('SR', vdi.SR)
error.VDI = vdi
error.nbdClient = nbdClient
nbdClient?.disconnect()
throw error
}
}
Expand Down
2 changes: 0 additions & 2 deletions packages/vhd-lib/Vhd/VhdAbstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ exports.VhdAbstract = class VhdAbstract {
return fileSize
}

// @todo split the iterator and stream
// @todo the iterator may support concurrency, to yield multiple block in parallel
stream() {
const { footer, batSize } = this
const { ...header } = this.header // copy since we don't ant to modifiy the current header
Expand Down
193 changes: 0 additions & 193 deletions packages/vhd-lib/Vhd/VhdFromStream.js

This file was deleted.

Loading

0 comments on commit 7c6bb83

Please sign in to comment.