Skip to content

Commit

Permalink
tart pull: try to re-use local VM image layers to speed-up the pulling (
Browse files Browse the repository at this point in the history
#825)

* Remove unused pullFromRegistry() method with "reference" argument

* tart pull: try to deduplicate disk layers to speed-up the pulling
  • Loading branch information
edigaryev authored May 16, 2024
1 parent dbbd716 commit 7c386e3
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 37 deletions.
32 changes: 32 additions & 0 deletions Sources/tart/LocalLayerCache.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import Foundation

struct LocalLayerCache {
private let mappedDisk: Data
private var digestToRange: [String : Range<Data.Index>] = [:]

init?(_ diskURL: URL, _ manifest: OCIManifest) throws {
// mmap(2) the disk that contains the layers from the manifest
self.mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])

// Record the ranges of the disk layers listed in the manifest
var offset: UInt64 = 0

for layer in manifest.layers.filter({ $0.mediaType == diskV2MediaType }) {
guard let uncompressedSize = layer.uncompressedSize() else {
return nil
}

self.digestToRange[layer.digest] = Int(offset)..<Int(offset+uncompressedSize)

offset += uncompressedSize
}
}

func find(_ digest: String) -> Data? {
guard let foundRange = self.digestToRange[digest] else {
return nil
}

return self.mappedDisk.subdata(in: foundRange)
}
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/Disk.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Foundation

protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/DiskV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DiskV1: Disk {
return pushedLayers
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}
Expand Down
70 changes: 46 additions & 24 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import Compression
class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000
private static let holeGranularityBytes = 64 * 1024

static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [OCIManifestLayer] = []
Expand Down Expand Up @@ -34,7 +33,7 @@ class DiskV2: Disk {
return pushedLayers
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)

Expand Down Expand Up @@ -93,19 +92,18 @@ class DiskV2: Disk {
// Open the disk file
let disk = try FileHandle(forWritingTo: diskURL)

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 64 KiB of excess data per 500 MB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
let zeroChunk = Data(count: holeGranularityBytes)
// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let data = localLayerCache.find(diskLayer.digest), Digest.hash(data) == uncompressedLayerContentDigest {
// Fulfil the layer contents from the local blob cache
_ = try zeroSkippingWrite(disk, diskWritingOffset, data)
try disk.close()

// Update the progress
progress.completedUnitCount += Int64(diskLayer.size)

return
}

var diskWritingOffset = diskWritingOffset

// Pull and decompress a single layer into the specific offset on disk
Expand All @@ -114,15 +112,7 @@ class DiskV2: Disk {
return
}

for chunk in data.chunks(ofCount: holeGranularityBytes) {
// Only write chunks that are not zero
if chunk != zeroChunk {
try disk.seek(toOffset: diskWritingOffset)
disk.write(chunk)
}

diskWritingOffset += UInt64(chunk.count)
}
diskWritingOffset = try zeroSkippingWrite(disk, diskWritingOffset, data)
}

try await registry.pullBlob(diskLayer.digest) { data in
Expand All @@ -141,4 +131,36 @@ class DiskV2: Disk {
}
}
}

private static func zeroSkippingWrite(_ disk: FileHandle, _ offset: UInt64, _ data: Data) throws -> UInt64 {
let holeGranularityBytes = 64 * 1024

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 64 KiB of excess data per 500 MB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
let zeroChunk = Data(count: holeGranularityBytes)

var offset = offset

for chunk in data.chunks(ofCount: holeGranularityBytes) {
// Only write chunks that are not zero
if chunk != zeroChunk {
try disk.seek(toOffset: offset)
disk.write(chunk)
}

offset += UInt64(chunk.count)
}

return offset
}
}
13 changes: 3 additions & 10 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,7 @@ extension VMDirectory {
private static let bufferSizeBytes = 64 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000

func pullFromRegistry(registry: Registry, reference: String, concurrency: UInt) async throws {
defaultLogger.appendNewLine("pulling manifest...")

let (manifest, _) = try await registry.pullManifest(reference: reference)

return try await pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency)
}

func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt) async throws {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
$0.mediaType == configMediaType
Expand Down Expand Up @@ -61,7 +53,8 @@ extension VMDirectory {
let progress = Progress(totalUnitCount: diskCompressedSize)
ProgressObserver(progress).log(defaultLogger)

try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress)
try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress,
localLayerCache: localLayerCache)

// Pull VM's NVRAM file layer and store it in an NVRAM file
defaultLogger.appendNewLine("pulling NVRAM...")
Expand Down
11 changes: 10 additions & 1 deletion Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,16 @@ class VMStorageOCI: PrunableStorage {

try await withTaskCancellationHandler(operation: {
try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) {
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency)
var localLayerCache: LocalLayerCache? = nil

if name.reference.type == .Tag,
let vmDir = try? open(name),
let digest = try? digest(name),
let (manifest, _) = try? await registry.pullManifest(reference: digest) {
localLayerCache = try LocalLayerCache(vmDir.diskURL, manifest)
}

try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
} recoverFromFailure: { error in
print("Error: \(error.localizedDescription)")
print("Attempting to re-try...")
Expand Down

0 comments on commit 7c386e3

Please sign in to comment.