Skip to content

Commit

Permalink
refactor(@xen-orchestra/backups): remove old nbd code
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Mar 15, 2023
1 parent 8478fa9 commit f19d155
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 148 deletions.
3 changes: 1 addition & 2 deletions @xen-orchestra/backups/RemoteAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ class RemoteAdapter {
return path
}

async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency, nbdClient } = {}) {
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
const handler = this._handler
if (this.useVhdDirectory()) {
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
Expand All @@ -669,7 +669,6 @@ class RemoteAdapter {
await input.task
return validator.apply(this, arguments)
},
nbdClient,
})
await VhdAbstract.createAlias(handler, path, dataPath)
return size
Expand Down
32 changes: 1 addition & 31 deletions @xen-orchestra/backups/writers/DeltaBackupWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const NbdClient = require('@vates/nbd-client')

const { debug, warn, info } = createLogger('xo:backups:DeltaBackupWriter')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')

class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi) {
Expand Down Expand Up @@ -200,41 +199,12 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
await checkVhd(handler, parentPath)
}

const vdiRef = vm.$xapi.getObject(vdi.uuid).$ref

let nbdClient
if (this._backup.config.useNbd && adapter.useVhdDirectory()) {
debug('useNbd is enabled', { vdi: id, path })
// get nbd if possible
try {
// this will always take the first host in the list
const [nbdInfo] = await vm.$xapi.call('VDI.get_nbd_info', vdiRef)
debug('got NBD info', { nbdInfo, vdi: id, path })
nbdClient = new NbdClient(nbdInfo)
await nbdClient.connect()

// this will inform the xapi that we don't need this anymore
// and will detach the vdi from dom0
$defer(() => nbdClient.disconnect())

info('NBD client ready', { vdi: id, path })
Task.info('NBD used')
} catch (error) {
Task.warning('NBD configured but unusable', { error })
nbdClient = undefined
warn('error connecting to NBD server', { error, vdi: id, path })
}
} else {
debug('useNbd is disabled', { vdi: id, path })
}

transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._backup.config.writeBlockConcurrency,
nbdClient,
})

if (isDelta) {
Expand Down
8 changes: 4 additions & 4 deletions packages/vhd-lib/createVhdDirectoryFromStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ const { asyncEach } = require('@vates/async-each')

const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')

const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression, nbdClient }) {
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression })
await asyncEach(
parseVhdStream(inputStream, nbdClient),
parseVhdStream(inputStream),
async function (item) {
switch (item.type) {
case 'footer':
Expand Down Expand Up @@ -45,10 +45,10 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
handler,
path,
inputStream,
{ validator, concurrency = 16, compression, nbdClient } = {}
{ validator, concurrency = 16, compression } = {}
) {
try {
const size = await buildVhd(handler, path, inputStream, { concurrency, compression, nbdClient })
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
if (validator !== undefined) {
await validator.call(this, path)
}
Expand Down
116 changes: 5 additions & 111 deletions packages/vhd-lib/parseVhdStream.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
'use strict'

const { BLOCK_UNUSED, FOOTER_SIZE, HEADER_SIZE, SECTOR_SIZE } = require('./_constants')
const { readChunk } = require('@vates/read-chunk')
const { readChunkStrict, skipChunk } = require('@vates/read-chunk')
const assert = require('assert')
const { unpackFooter, unpackHeader, computeFullBlockSize } = require('./Vhd/_utils')
const { asyncEach } = require('@vates/async-each')

const cappedBufferConcat = (buffers, maxSize) => {
let buffer = Buffer.concat(buffers)
Expand Down Expand Up @@ -42,10 +41,9 @@ class StreamParser {
assert(this._bytesRead <= offset, `offset is ${offset} but we already read ${this._bytesRead} bytes`)
if (this._bytesRead < offset) {
// empty spaces
await this._read(this._bytesRead, offset - this._bytesRead)
await skipChunk(this._stream, offset - this._bytesRead)
}
const buf = await readChunk(this._stream, size)
assert.strictEqual(buf.length, size, `read ${buf.length} instead of ${size}`)
const buf = await readChunkStrict(this._stream, size)
this._bytesRead += size
return buf
}
Expand Down Expand Up @@ -154,111 +152,7 @@ class StreamParser {
yield* this.blocks()
}
}

// hybrid mode : read the headers from the vhd stream, and read the blocks from nbd
class StreamNbdParser extends StreamParser {
#nbdClient = null
#concurrency = 16

constructor(stream, nbdClient = {}) {
super(stream)
this.#nbdClient = nbdClient
}

async _readBlockData(item) {
const SECTOR_BITMAP = Buffer.alloc(512, 255)
const client = this.#nbdClient
// we read in a raw file, so the block position is id x length, and have nothing to do with the offset
// in the vhd stream
const rawDataLength = item.size - SECTOR_BITMAP.length
const data = await client.readBlock(item.id, rawDataLength)

// end of file , non aligned vhd block
const buffer = Buffer.concat([SECTOR_BITMAP, data])
const block = {
...item,
size: rawDataLength,
bitmap: SECTOR_BITMAP,
data,
buffer,
}
return block
}

async *blocks() {
// at most this array will be this.#concurrency long
const blocksReady = []
let waitingForBlock
let done = false
let error

function waitForYield(block) {
return new Promise(resolve => {
blocksReady.push({
block,
yielded: resolve,
})
if (waitingForBlock !== undefined) {
const resolver = waitingForBlock
waitingForBlock = undefined
resolver()
}
})
}

asyncEach(
this._index,
async blockId => {
const block = await this._readBlockData(blockId)
await waitForYield(block)
},
{ concurrency: this.#concurrency }
)
.then(() => {
done = true
waitingForBlock?.()
})
.catch(err => {
// will keep only the last error if multiple throws
error = err
waitingForBlock?.()
})
// eslint-disable-next-line no-unmodified-loop-condition
while (!done) {
if (error) {
throw error
}
if (blocksReady.length > 0) {
const { block, yielded } = blocksReady.shift()
yielded()
yield block
} else {
await new Promise(resolve => {
waitingForBlock = resolve
})
}
}
}

async *parse() {
yield* this.headers()

// the VHD stream is no longer necessary, destroy it
//
// - not destroying it would leave other writers stuck
// - resuming it would download the whole stream unnecessarily if not other writers
this._stream.destroy()

yield* this.blocks()
}
}

exports.parseVhdStream = async function* parseVhdStream(stream, nbdClient) {
let parser
if (nbdClient) {
parser = new StreamNbdParser(stream, nbdClient)
} else {
parser = new StreamParser(stream)
}
exports.parseVhdStream = async function* parseVhdStream(stream) {
const parser = new StreamParser(stream)
yield* parser.parse()
}

0 comments on commit f19d155

Please sign in to comment.