Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(xapi/VDI_exportContent): can export from NBD #6716

Merged
merged 13 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions @vates/nbd-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,58 @@ module.exports = class NbdClient {
buffer.writeInt32BE(size, 24)

return new Promise((resolve, reject) => {
function decoratedReject(error) {
error.index = index
error.size = size
reject(error)
}

// this will handle one block response, but it can be another block
// since server does not guaranty to handle query in order
this.#commandQueryBacklog.set(queryId, {
size,
resolve,
reject,
reject: decoratedReject,
})
// really send the command to the server
this.#write(buffer).catch(reject)
this.#write(buffer).catch(decoratedReject)

// #readBlockResponse never throws directly
// but if it fails it will reject all the promises in the backlog
this.#readBlockResponse()
})
}

async *readBlocks(indexGenerator) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
indexGenerator = function* () {
const nbBlocks = Math.ceil(exportSize / chunkSize)
for (let index = 0; index < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
}
const readAhead = []
const readAheadMaxLength = 10

// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLengthpromise before starting to handle the results
if (readAhead.length === readAheadMaxLength) {
// any error will stop reading blocks
yield readAhead.shift()
}

// error is handled during unshift
const promise = this.readBlock(index, size)
promise.catch(() => {})
readAhead.push(promise)
}
while (readAhead.length > 0) {
yield readAhead.shift()
}
}
}
3 changes: 1 addition & 2 deletions @xen-orchestra/backups/RemoteAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ class RemoteAdapter {
return path
}

async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency, nbdClient } = {}) {
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
const handler = this._handler
if (this.useVhdDirectory()) {
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
Expand All @@ -677,7 +677,6 @@ class RemoteAdapter {
await input.task
return validator.apply(this, arguments)
},
nbdClient,
})
await VhdAbstract.createAlias(handler, path, dataPath)
return size
Expand Down
5 changes: 5 additions & 0 deletions @xen-orchestra/backups/_VmBackup.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ class VmBackup {
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
fullVdisRequired,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)

Expand Down
32 changes: 1 addition & 31 deletions @xen-orchestra/backups/writers/DeltaBackupWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const NbdClient = require('@vates/nbd-client')

const { debug, warn, info } = createLogger('xo:backups:DeltaBackupWriter')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')

class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi) {
Expand Down Expand Up @@ -200,41 +199,12 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
await checkVhd(handler, parentPath)
}

const vdiRef = vm.$xapi.getObject(vdi.uuid).$ref

let nbdClient
if (this._backup.config.useNbd && adapter.useVhdDirectory()) {
debug('useNbd is enabled', { vdi: id, path })
// get nbd if possible
try {
// this will always take the first host in the list
const [nbdInfo] = await vm.$xapi.call('VDI.get_nbd_info', vdiRef)
debug('got NBD info', { nbdInfo, vdi: id, path })
nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()

// this will inform the xapi that we don't need this anymore
// and will detach the vdi from dom0
$defer(() => nbdClient.disconnect())

info('NBD client ready', { vdi: id, path })
Task.info('NBD used')
} catch (error) {
Task.warning('NBD configured but unusable', { error })
nbdClient = undefined
warn('error connecting to NBD server', { error, vdi: id, path })
}
} else {
debug('useNbd is disabled', { vdi: id, path })
}

transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._backup.config.writeBlockConcurrency,
nbdClient,
})

if (isDelta) {
Expand Down
2 changes: 2 additions & 0 deletions @xen-orchestra/xapi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class Xapi extends Base {
constructor({
callRetryWhenTooManyPendingTasks = { delay: 5e3, tries: 10 },
maxUncoalescedVdis,
preferNbd = false,
syncHookSecret,
syncHookTimeout,
vdiDestroyRetryWhenInUse = { delay: 5e3, tries: 10 },
Expand All @@ -146,6 +147,7 @@ class Xapi extends Base {
when: { code: 'TOO_MANY_PENDING_TASKS' },
}
this._maxUncoalescedVdis = maxUncoalescedVdis
this._preferNbd = preferNbd
this._syncHookSecret = syncHookSecret
this._syncHookTimeout = syncHookTimeout
this._vdiDestroyRetryWhenInUse = {
Expand Down
1 change: 1 addition & 0 deletions @xen-orchestra/xapi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"dependencies": {
"@vates/decorate-with": "^2.0.0",
"@vates/nbd-client": "1.0.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"d3-time-format": "^3.0.0",
Expand Down
47 changes: 43 additions & 4 deletions @xen-orchestra/xapi/vdi.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const pRetry = require('promise-toolbox/retry')
const { decorateClass } = require('@vates/decorate-with')

const extractOpaqueRef = require('./_extractOpaqueRef.js')
const NbdClient = require('@vates/nbd-client')
const { createNbdRawStream, createNbdVhdStream } = require('vhd-lib/createStreamNbd.js')
const { VDI_FORMAT_RAW, VDI_FORMAT_VHD } = require('./index.js')
const { warn } = require('@xen-orchestra/log').createLogger('xo:xapi:vdi')

const noop = Function.prototype

Expand Down Expand Up @@ -60,6 +64,25 @@ class Vdi {
})
}

async _getNbdClient(ref) {
const nbdInfos = await this.call('VDI.get_nbd_info', ref)
if (nbdInfos.length > 0) {
// a little bit of randomization to spread the load
const nbdInfo = nbdInfos[Math.floor(Math.random() * nbdInfos.length)]
try {
const nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()
return nbdClient
} catch (err) {
warn(`can't connect to nbd server `, {
err,
nbdInfo,
nbdInfos,
})
}
}
}

async exportContent(ref, { baseRef, cancelToken = CancelToken.none, format }) {
const query = {
format,
Expand All @@ -71,11 +94,25 @@ class Vdi {

query.base = baseRef
}
let nbdClient, stream
try {
return await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
})
if (this._preferNbd) {
nbdClient = await this._getNbdClient(ref)
}
// the raw nbd export does not need to peek ath the vhd source
if (nbdClient !== undefined && format === VDI_FORMAT_RAW) {
stream = createNbdRawStream(nbdClient)
} else {
// raw export without nbd or vhd exports needs a resource stream
stream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`),
})
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
stream = await createNbdVhdStream(nbdClient, stream)
}
}
return stream
} catch (error) {
// augment the error with as much relevant info as possible
const [poolMaster, vdi] = await Promise.all([
Expand All @@ -85,6 +122,8 @@ class Vdi {
error.pool_master = poolMaster
error.SR = await this.getRecord('SR', vdi.SR)
error.VDI = vdi
error.nbdClient = nbdClient
nbdClient?.disconnect()
throw error
}
}
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

<!--packages-start-->

- @vates/nbd-client minor
- @vates/read-chunk minor
- @xen-orchestra/backups minor
- @xen-orchestra/xapi minor
- vhd-lib minor

<!--packages-end-->
112 changes: 112 additions & 0 deletions packages/vhd-lib/createStreamNbd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
'use strict'
const { readChunkStrict, skipStrict } = require('@vates/read-chunk')
const { Readable } = require('node:stream')
const { unpackHeader } = require('./Vhd/_utils')
const {
FOOTER_SIZE,
HEADER_SIZE,
PARENT_LOCATOR_ENTRIES,
SECTOR_SIZE,
BLOCK_UNUSED,
DEFAULT_BLOCK_SIZE,
PLATFORMS,
} = require('./_constants')
const { fuHeader, checksumStruct } = require('./_structs')
const assert = require('node:assert')

exports.createNbdRawStream = async function createRawStream(nbdClient) {
const stream = Readable.from(nbdClient.readBlocks())

stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
return stream
}

exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStream) {
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)

const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
// compute BAT in order
const batSize = Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE) * SECTOR_SIZE
await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE))
const streamBat = await readChunkStrict(sourceStream, batSize)
let offset = FOOTER_SIZE + HEADER_SIZE + batSize
// check if parentlocator are ordered
let precLocator = 0
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
header.parentLocatorEntry[i] = {
...header.parentLocatorEntry[i],
platformDataOffset: offset,
}
offset += header.parentLocatorEntry[i].platformDataSpace * SECTOR_SIZE
if (header.parentLocatorEntry[i].platformCode !== PLATFORMS.NONE) {
assert(
precLocator < offset,
`locator must be ordered. numer ${i} is at ${offset}, precedent is at ${precLocator}`
)
precLocator = offset
}
}
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
const rawHeader = fuHeader.pack(header)
checksumStruct(rawHeader, fuHeader)

// BAT
const bat = Buffer.allocUnsafe(batSize)
let offsetSector = offset / SECTOR_SIZE
const blockSizeInSectors = DEFAULT_BLOCK_SIZE / SECTOR_SIZE + 1 /* bitmap */
// compute a BAT with the position that the block will have in the resulting stream
// blocks starts directly after parent locator entries
const entries = []
for (let i = 0; i < header.maxTableEntries; i++) {
const entry = streamBat.readUInt32BE(i * 4)
if (entry !== BLOCK_UNUSED) {
bat.writeUInt32BE(offsetSector, i * 4)
offsetSector += blockSizeInSectors
entries.push(i)
} else {
bat.writeUInt32BE(BLOCK_UNUSED, i * 4)
}
}

async function* iterator() {
yield bufFooter
yield rawHeader
yield bat

let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
const parentLocatorOffset = header.parentLocatorEntry[i].platformDataOffset
const space = header.parentLocatorEntry[i].platformDataSpace * SECTOR_SIZE
if (space > 0) {
await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset)
const data = await readChunkStrict(sourceStream, space)
precBlocOffset = parentLocatorOffset + space
yield data
}
}

// close export stream we won't use it anymore
sourceStream.destroy()

// yield blocks from nbd
const nbdIterator = nbdClient.readBlocks(function* () {
for (const entry of entries) {
yield { index: entry, size: DEFAULT_BLOCK_SIZE }
}
})
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
for await (const block of nbdIterator) {
yield bitmap // don't forget the bitmap before the block
yield block
}
yield bufFooter
}

const stream = Readable.from(iterator())
stream.length = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE
stream._nbd = true
stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
return stream
}
8 changes: 4 additions & 4 deletions packages/vhd-lib/createVhdDirectoryFromStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ const { asyncEach } = require('@vates/async-each')

const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')

const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression, nbdClient }) {
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression })
await asyncEach(
parseVhdStream(inputStream, nbdClient),
parseVhdStream(inputStream),
async function (item) {
switch (item.type) {
case 'footer':
Expand Down Expand Up @@ -45,10 +45,10 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
handler,
path,
inputStream,
{ validator, concurrency = 16, compression, nbdClient } = {}
{ validator, concurrency = 16, compression } = {}
) {
try {
const size = await buildVhd(handler, path, inputStream, { concurrency, compression, nbdClient })
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
if (validator !== undefined) {
await validator.call(this, path)
}
Expand Down
Loading