Skip to content

Commit

Permalink
refactor(vhd-lib): read vhd from stream and nbd, create vhd directory
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Mar 9, 2023
1 parent c5ae0dc commit 9f688fa
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 1 deletion.
3 changes: 2 additions & 1 deletion @xen-orchestra/backups/_deltaVm.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ exports.exportDeltaVm = async function exportDeltaVm(
$snapshot_of$uuid: vdi.$snapshot_of?.uuid,
$SR$uuid: vdi.$SR.uuid,
}

// @todo : use (new VhdFromStream(stream)).stream() instead of vdi.$exportContent directly
// @todo : use nbdclient if possible
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
baseRef: baseVdi?.$ref,
cancelToken,
Expand Down
2 changes: 2 additions & 0 deletions packages/vhd-lib/Vhd/VhdAbstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ exports.VhdAbstract = class VhdAbstract {
return fileSize
}

// @todo split the iterator and stream
// @todo the iterator may support concurrency, to yield multiple block in parallel
stream() {
const { footer, batSize } = this
const { ...header } = this.header // copy since we don't ant to modifiy the current header
Expand Down
193 changes: 193 additions & 0 deletions packages/vhd-lib/Vhd/VhdFromStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
'use strict'

const assert = require('node:assert')
const { readChunkStrict } = require('@vates/read-chunk')
const { VhdFile } = require('./VhdFile')
const { fuFooter, fuHeader } = require('../_structs')
const asyncIteratorToStream = require('async-iterator-to-stream/dist')
const { computeFullBlockSize } = require('./_utils')
const { SECTOR_SIZE, BLOCK_UNUSED } = require('../_constants')

const cappedBufferConcat = (buffers, maxSize) => {
let buffer = Buffer.concat(buffers)
if (buffer.length > maxSize) {
buffer = buffer.slice(buffer.length - maxSize)
}
return buffer
}

function readLastSector(stream) {
return new Promise((resolve, reject) => {
let bufFooterEnd = Buffer.alloc(0)
stream.on('data', chunk => {
if (chunk.length > 0) {
bufFooterEnd = cappedBufferConcat([bufFooterEnd, chunk], SECTOR_SIZE)
}
})

stream.on('end', () => resolve(bufFooterEnd))
stream.on('error', reject)
})
}

exports.VhFromStream = class VhdFromStream extends VhdFile {
#bitmapSize
#bytesRead = 0
#index = []
#stream

constructor(stream) {
super()
this.#stream = stream
}

async _read(offset, size) {
assert(this.#bytesRead <= offset, `offset is ${offset} but we already read ${this.#bytesRead} bytes`)
if (this.#bytesRead < offset) {
// empty spaces
// @fixme : should read chunk of a reasonnbale max size
await this._read(this.#stream, offset - this.#bytesRead)
}
const buf = await readChunkStrict(this.#stream, size)
this.#bytesRead += size
return buf
}

readHeaderAndFooter(checkSecondFooter = true) {
assert(checkSecondFooter, false)
super.readHeaderAndFooter()
}

async rawContent(streamIsSorted = false) {
if (!streamIsSorted) {
throw new Error("We can't generate a raw stream from an unsorted source stream")
}
return super.rawContent()
}

// this is reimplemented to ensure we read the stream in order
async *vhdHeaderIterator() {
assert.strictEqual(this.#bytesRead, 0)
await this.readHeaderAndFooter()
const { header, footer } = this
const blockSize = header.blockSize
const fullBlockSize = computeFullBlockSize(blockSize)
assert.strictEqual(blockSize % SECTOR_SIZE, 0)
this.#bitmapSize = fullBlockSize - blockSize
yield { type: 'footer', footer: fuFooter.pack(footer), offset: 0 }
yield { type: 'header', header: fuHeader.pack(header), offset: SECTOR_SIZE }

let batFound = false

for (const parentLocatorId in header.parentLocatorEntry) {
const parentLocatorEntry = header.parentLocatorEntry[parentLocatorId]
// empty parent locator entry, does not exist in the content
if (parentLocatorEntry.platformDataSpace === 0) {
continue
}
this.#index.push({
...parentLocatorEntry,
type: 'parentLocator',
offset: parentLocatorEntry.platformDataOffset,
size: parentLocatorEntry.platformDataLength,
id: parentLocatorId,
})
}

const batOffset = header.tableOffset
const batSize = Math.max(1, Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE)) * SECTOR_SIZE

this.#index.push({
type: 'bat',
offset: batOffset,
size: batSize,
})

// sometimes some parent locator are before the BAT
this.#index.sort((a, b) => a.offset - b.offset)

while (!batFound) {
const item = this.#index.shift()
const buffer = await this._read(item.offset, item.size)
item.buffer = buffer

const { type } = item
if (type === 'bat') {
// found the BAT : read it and add block to index

let blockCount = 0
for (let blockCounter = 0; blockCounter < header.maxTableEntries; blockCounter++) {
const batEntrySector = buffer.readUInt32BE(blockCounter * 4)
// unallocated block, no need to export it
if (batEntrySector !== BLOCK_UNUSED) {
const batEntryBytes = batEntrySector * SECTOR_SIZE
// ensure the block is not before the bat
assert.ok(batEntryBytes >= batOffset + batSize)
this.#index.push({
type: 'block',
id: blockCounter,
offset: batEntryBytes,
size: fullBlockSize,
})
blockCount++
}
}
// sort again index to ensure block and parent locator are in the right order
this.#index.sort((a, b) => a.offset - b.offset)
item.blockCount = blockCount
batFound = true
}
yield item
}
}
async *vhdBlocksIterator() {
// yields parent locator and blocks
while (this.#index.length > 0) {
const item = this.#index.shift()
const buffer = await this._read(item.offset, item.size)

item.bitmap = buffer.slice(0, this.#bitmapSize)
item.data = buffer.slice(this.#bitmapSize)
item.buffer = buffer
yield item
}
/**
* the second footer is at filesize - 512 , there can be empty spaces between last block
* and the start of the footer
*
* we read till the end of the stream, and use the last 512 bytes as the footer
*/
const bufFooterEnd = await readLastSector(this.#stream)
assert(fuFooter.pack(this.footer).equals(bufFooterEnd), 'footer1 !== footer2')
}

async *vhdIterator() {
yield* this.vhdHeaderIterator()
yield* this.vhdBlockIterator()
}

async stream(readConcurrency) {
assert.strictEqual(this.#bytesRead, 0)
if (readConcurrency === 1) {
return this.#stream
}

const stream = asyncIteratorToStream(await this.vhdIterator())
stream.length = 0 // @todo
}
async ensureBatSize() {
throw new Error('not implemented')
}
async writeEntireBlock() {
throw new Error('not implemented')
}
async writeFooter() {
throw new Error('not implemented')
}
async writeHeader() {
throw new Error('not implemented')
}
async _writeParentLocatorData() {
throw new Error('not implemented')
}
}
45 changes: 45 additions & 0 deletions packages/vhd-lib/Vhd/VhdFromStreamNbd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

const { VhdFromStream } = require('./VhdFromStream')

const SECTOR_BITMAP = Buffer.alloc(512, 255)
exports.VhdFromStreamNbd = class VhdFromStreamNbd extends VhdFromStream {
#nbdClient
constructor(stream, nbdClient) {
super(stream)
this.#nbdClient = nbdClient
}
async _readBlockData(item) {
const client = this.#nbdClient
// we read in a raw file, so the block position is id x length, and have nothing to do with the offset
// in the vhd stream
const rawDataLength = item.size - SECTOR_BITMAP.length
const data = await client.readBlock(item.id, rawDataLength)

// end of file , non aligned vhd block
const buffer = Buffer.concat([SECTOR_BITMAP, data])
const block = {
...item,
size: rawDataLength,
bitmap: SECTOR_BITMAP,
data,
buffer,
}
return block
}

async *vhdBlocksIterator(concurrency) {
// @todo : use the concurrency yielder
}
async *vhdIterator(readConcurrency) {
yield* this.vhdHeaderIterator()

// the VHD stream is no longer necessary, destroy it
//
// - not destroying it would leave other writers stuck
// - resuming it would download the whole stream unnecessarily if not other writers
this._stream.destroy()

yield* this.vhdBlockIterator(readConcurrency)
}
}

0 comments on commit 9f688fa

Please sign in to comment.