Skip to content

Commit

Permalink
refactor(xapi/VDI_exportContent): can export from NBD (#6716)
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp authored Mar 30, 2023
1 parent 4e9477f commit a4d1d41
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 156 deletions.
43 changes: 41 additions & 2 deletions @vates/nbd-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,58 @@ module.exports = class NbdClient {
buffer.writeInt32BE(size, 24)

return new Promise((resolve, reject) => {
function decoratedReject(error) {
error.index = index
error.size = size
reject(error)
}

// this will handle one block response, but it can be another block
// since server does not guaranty to handle query in order
this.#commandQueryBacklog.set(queryId, {
size,
resolve,
reject,
reject: decoratedReject,
})
// really send the command to the server
this.#write(buffer).catch(reject)
this.#write(buffer).catch(decoratedReject)

// #readBlockResponse never throws directly
// but if it fails it will reject all the promises in the backlog
this.#readBlockResponse()
})
}

async *readBlocks(indexGenerator) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
indexGenerator = function* () {
const nbBlocks = Math.ceil(exportSize / chunkSize)
for (let index = 0; index < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
}
const readAhead = []
const readAheadMaxLength = 10

// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLengthpromise before starting to handle the results
if (readAhead.length === readAheadMaxLength) {
// any error will stop reading blocks
yield readAhead.shift()
}

// error is handled during unshift
const promise = this.readBlock(index, size)
promise.catch(() => {})
readAhead.push(promise)
}
while (readAhead.length > 0) {
yield readAhead.shift()
}
}
}
3 changes: 1 addition & 2 deletions @xen-orchestra/backups/RemoteAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ class RemoteAdapter {
return path
}

async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency, nbdClient } = {}) {
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
const handler = this._handler
if (this.useVhdDirectory()) {
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
Expand All @@ -677,7 +677,6 @@ class RemoteAdapter {
await input.task
return validator.apply(this, arguments)
},
nbdClient,
})
await VhdAbstract.createAlias(handler, path, dataPath)
return size
Expand Down
5 changes: 5 additions & 0 deletions @xen-orchestra/backups/_VmBackup.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ class VmBackup {
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
fullVdisRequired,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)

Expand Down
32 changes: 1 addition & 31 deletions @xen-orchestra/backups/writers/DeltaBackupWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const NbdClient = require('@vates/nbd-client')

const { debug, warn, info } = createLogger('xo:backups:DeltaBackupWriter')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')

class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi) {
Expand Down Expand Up @@ -200,41 +199,12 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
await checkVhd(handler, parentPath)
}

const vdiRef = vm.$xapi.getObject(vdi.uuid).$ref

let nbdClient
if (this._backup.config.useNbd && adapter.useVhdDirectory()) {
debug('useNbd is enabled', { vdi: id, path })
// get nbd if possible
try {
// this will always take the first host in the list
const [nbdInfo] = await vm.$xapi.call('VDI.get_nbd_info', vdiRef)
debug('got NBD info', { nbdInfo, vdi: id, path })
nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()

// this will inform the xapi that we don't need this anymore
// and will detach the vdi from dom0
$defer(() => nbdClient.disconnect())

info('NBD client ready', { vdi: id, path })
Task.info('NBD used')
} catch (error) {
Task.warning('NBD configured but unusable', { error })
nbdClient = undefined
warn('error connecting to NBD server', { error, vdi: id, path })
}
} else {
debug('useNbd is disabled', { vdi: id, path })
}

transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._backup.config.writeBlockConcurrency,
nbdClient,
})

if (isDelta) {
Expand Down
2 changes: 2 additions & 0 deletions @xen-orchestra/xapi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class Xapi extends Base {
constructor({
callRetryWhenTooManyPendingTasks = { delay: 5e3, tries: 10 },
maxUncoalescedVdis,
preferNbd = false,
syncHookSecret,
syncHookTimeout,
vdiDestroyRetryWhenInUse = { delay: 5e3, tries: 10 },
Expand All @@ -146,6 +147,7 @@ class Xapi extends Base {
when: { code: 'TOO_MANY_PENDING_TASKS' },
}
this._maxUncoalescedVdis = maxUncoalescedVdis
this._preferNbd = preferNbd
this._syncHookSecret = syncHookSecret
this._syncHookTimeout = syncHookTimeout
this._vdiDestroyRetryWhenInUse = {
Expand Down
1 change: 1 addition & 0 deletions @xen-orchestra/xapi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"dependencies": {
"@vates/decorate-with": "^2.0.0",
"@vates/nbd-client": "1.0.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"d3-time-format": "^3.0.0",
Expand Down
47 changes: 43 additions & 4 deletions @xen-orchestra/xapi/vdi.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const pRetry = require('promise-toolbox/retry')
const { decorateClass } = require('@vates/decorate-with')

const extractOpaqueRef = require('./_extractOpaqueRef.js')
const NbdClient = require('@vates/nbd-client')
const { createNbdRawStream, createNbdVhdStream } = require('vhd-lib/createStreamNbd.js')
const { VDI_FORMAT_RAW, VDI_FORMAT_VHD } = require('./index.js')
const { warn } = require('@xen-orchestra/log').createLogger('xo:xapi:vdi')

const noop = Function.prototype

Expand Down Expand Up @@ -60,6 +64,25 @@ class Vdi {
})
}

async _getNbdClient(ref) {
const nbdInfos = await this.call('VDI.get_nbd_info', ref)
if (nbdInfos.length > 0) {
// a little bit of randomization to spread the load
const nbdInfo = nbdInfos[Math.floor(Math.random() * nbdInfos.length)]
try {
const nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()
return nbdClient
} catch (err) {
warn(`can't connect to nbd server `, {
err,
nbdInfo,
nbdInfos,
})
}
}
}

async exportContent(ref, { baseRef, cancelToken = CancelToken.none, format }) {
const query = {
format,
Expand All @@ -71,11 +94,25 @@ class Vdi {

query.base = baseRef
}
let nbdClient, stream
try {
return await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
})
if (this._preferNbd) {
nbdClient = await this._getNbdClient(ref)
}
// the raw nbd export does not need to peek ath the vhd source
if (nbdClient !== undefined && format === VDI_FORMAT_RAW) {
stream = createNbdRawStream(nbdClient)
} else {
// raw export without nbd or vhd exports needs a resource stream
stream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
})
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
stream = await createNbdVhdStream(nbdClient, stream)
}
}
return stream
} catch (error) {
// augment the error with as much relevant info as possible
const [poolMaster, vdi] = await Promise.all([
Expand All @@ -85,6 +122,8 @@ class Vdi {
error.pool_master = poolMaster
error.SR = await this.getRecord('SR', vdi.SR)
error.VDI = vdi
error.nbdClient = nbdClient
nbdClient?.disconnect()
throw error
}
}
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<!--packages-start-->

- @vates/nbd-client minor
- @vates/read-chunk minor
- @xen-orchestra/backups minor
- @xen-orchestra/xapi minor
- vhd-lib minor

<!--packages-end-->
112 changes: 112 additions & 0 deletions packages/vhd-lib/createStreamNbd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
'use strict'
const { readChunkStrict, skipStrict } = require('@vates/read-chunk')
const { Readable } = require('node:stream')
const { unpackHeader } = require('./Vhd/_utils')
const {
FOOTER_SIZE,
HEADER_SIZE,
PARENT_LOCATOR_ENTRIES,
SECTOR_SIZE,
BLOCK_UNUSED,
DEFAULT_BLOCK_SIZE,
PLATFORMS,
} = require('./_constants')
const { fuHeader, checksumStruct } = require('./_structs')
const assert = require('node:assert')

exports.createNbdRawStream = async function createRawStream(nbdClient) {
const stream = Readable.from(nbdClient.readBlocks())

stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
return stream
}

exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStream) {
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)

const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
// compute BAT in order
const batSize = Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE) * SECTOR_SIZE
await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE))
const streamBat = await readChunkStrict(sourceStream, batSize)
let offset = FOOTER_SIZE + HEADER_SIZE + batSize
// check if parentlocator are ordered
let precLocator = 0
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
header.parentLocatorEntry[i] = {
...header.parentLocatorEntry[i],
platformDataOffset: offset,
}
offset += header.parentLocatorEntry[i].platformDataSpace * SECTOR_SIZE
if (header.parentLocatorEntry[i].platformCode !== PLATFORMS.NONE) {
assert(
precLocator < offset,
`locator must be ordered. numer ${i} is at ${offset}, precedent is at ${precLocator}`
)
precLocator = offset
}
}
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
const rawHeader = fuHeader.pack(header)
checksumStruct(rawHeader, fuHeader)

// BAT
const bat = Buffer.allocUnsafe(batSize)
let offsetSector = offset / SECTOR_SIZE
const blockSizeInSectors = DEFAULT_BLOCK_SIZE / SECTOR_SIZE + 1 /* bitmap */
// compute a BAT with the position that the block will have in the resulting stream
// blocks starts directly after parent locator entries
const entries = []
for (let i = 0; i < header.maxTableEntries; i++) {
const entry = streamBat.readUInt32BE(i * 4)
if (entry !== BLOCK_UNUSED) {
bat.writeUInt32BE(offsetSector, i * 4)
offsetSector += blockSizeInSectors
entries.push(i)
} else {
bat.writeUInt32BE(BLOCK_UNUSED, i * 4)
}
}

async function* iterator() {
yield bufFooter
yield rawHeader
yield bat

let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
const parentLocatorOffset = header.parentLocatorEntry[i].platformDataOffset
const space = header.parentLocatorEntry[i].platformDataSpace * SECTOR_SIZE
if (space > 0) {
await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset)
const data = await readChunkStrict(sourceStream, space)
precBlocOffset = parentLocatorOffset + space
yield data
}
}

// close export stream we won't use it anymore
sourceStream.destroy()

// yield blocks from nbd
const nbdIterator = nbdClient.readBlocks(function* () {
for (const entry of entries) {
yield { index: entry, size: DEFAULT_BLOCK_SIZE }
}
})
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
for await (const block of nbdIterator) {
yield bitmap // don't forget the bitmap before the block
yield block
}
yield bufFooter
}

const stream = Readable.from(iterator())
stream.length = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE
stream._nbd = true
stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
return stream
}
8 changes: 4 additions & 4 deletions packages/vhd-lib/createVhdDirectoryFromStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ const { asyncEach } = require('@vates/async-each')

const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')

const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression, nbdClient }) {
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression })
await asyncEach(
parseVhdStream(inputStream, nbdClient),
parseVhdStream(inputStream),
async function (item) {
switch (item.type) {
case 'footer':
Expand Down Expand Up @@ -45,10 +45,10 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
handler,
path,
inputStream,
{ validator, concurrency = 16, compression, nbdClient } = {}
{ validator, concurrency = 16, compression } = {}
) {
try {
const size = await buildVhd(handler, path, inputStream, { concurrency, compression, nbdClient })
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
if (validator !== undefined) {
await validator.call(this, path)
}
Expand Down
Loading

0 comments on commit a4d1d41

Please sign in to comment.