From 7c386e3466aaa4afe22732b74434a464555d6135 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Thu, 16 May 2024 19:43:56 +0400 Subject: [PATCH] tart pull: try to re-use local VM image layers to speed-up the pulling (#825) * Remove unused pullFromRegistry() method with "reference" argument * tart pull: try to deduplicate disk layers to speed-up the pulling --- Sources/tart/LocalLayerCache.swift | 32 +++++++++++ Sources/tart/OCI/Layerizer/Disk.swift | 2 +- Sources/tart/OCI/Layerizer/DiskV1.swift | 2 +- Sources/tart/OCI/Layerizer/DiskV2.swift | 70 ++++++++++++++++--------- Sources/tart/VMDirectory+OCI.swift | 13 ++--- Sources/tart/VMStorageOCI.swift | 11 +++- 6 files changed, 93 insertions(+), 37 deletions(-) create mode 100644 Sources/tart/LocalLayerCache.swift diff --git a/Sources/tart/LocalLayerCache.swift b/Sources/tart/LocalLayerCache.swift new file mode 100644 index 00000000..ee0ff2e0 --- /dev/null +++ b/Sources/tart/LocalLayerCache.swift @@ -0,0 +1,32 @@ +import Foundation + +struct LocalLayerCache { + private let mappedDisk: Data + private var digestToRange: [String : Range] = [:] + + init?(_ diskURL: URL, _ manifest: OCIManifest) throws { + // mmap(2) the disk that contains the layers from the manifest + self.mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped]) + + // Record the ranges of the disk layers listed in the manifest + var offset: UInt64 = 0 + + for layer in manifest.layers.filter({ $0.mediaType == diskV2MediaType }) { + guard let uncompressedSize = layer.uncompressedSize() else { + return nil + } + + self.digestToRange[layer.digest] = Int(offset).. Data? { + guard let foundRange = self.digestToRange[digest] else { + return nil + } + + return self.mappedDisk.subdata(in: foundRange) + } +} diff --git a/Sources/tart/OCI/Layerizer/Disk.swift b/Sources/tart/OCI/Layerizer/Disk.swift index 7ebaa273..b420181f 100644 --- a/Sources/tart/OCI/Layerizer/Disk.swift +++ b/Sources/tart/OCI/Layerizer/Disk.swift @@ -2,5 +2,5 @@ import Foundation protocol Disk { static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] - static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws } diff --git a/Sources/tart/OCI/Layerizer/DiskV1.swift b/Sources/tart/OCI/Layerizer/DiskV1.swift index 637f8c58..edfa7613 100644 --- a/Sources/tart/OCI/Layerizer/DiskV1.swift +++ b/Sources/tart/OCI/Layerizer/DiskV1.swift @@ -45,7 +45,7 @@ class DiskV1: Disk { return pushedLayers } - static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws { + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws { if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) { throw OCIError.FailedToCreateVmFile } diff --git a/Sources/tart/OCI/Layerizer/DiskV2.swift b/Sources/tart/OCI/Layerizer/DiskV2.swift index 0a781222..283b1829 100644 --- a/Sources/tart/OCI/Layerizer/DiskV2.swift +++ b/Sources/tart/OCI/Layerizer/DiskV2.swift @@ -4,7 +4,6 @@ import Compression class DiskV2: Disk { private static let bufferSizeBytes = 4 * 1024 * 1024 private static let layerLimitBytes = 500 * 1000 * 1000 - private static let holeGranularityBytes = 64 * 1024 static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] { var pushedLayers: [OCIManifestLayer] = [] @@ -34,7 +33,7 @@ class DiskV2: Disk { return pushedLayers } - static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws { + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws { // Support resumable pulls let pullResumed = FileManager.default.fileExists(atPath: diskURL.path) @@ -93,19 +92,18 @@ class DiskV2: Disk { // Open the disk file let disk = try FileHandle(forWritingTo: diskURL) - // A zero chunk for faster than byte-by-byte comparisons - // - // Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative - // on the last block since it costs only 64 KiB of excess data per 500 MB layer. - // - // Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects): - // +--------------------------------------+---------------------------------------------------+ - // | Operation | time(1) result | - // +--------------------------------------+---------------------------------------------------+ - // | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total | - // | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total | - // +--------------------------------------+---------------------------------------------------+ - let zeroChunk = Data(count: holeGranularityBytes) + // Check if we already have this layer contents in the local layer cache + if let localLayerCache = localLayerCache, let data = localLayerCache.find(diskLayer.digest), Digest.hash(data) == uncompressedLayerContentDigest { + // Fulfil the layer contents from the local blob cache + _ = try zeroSkippingWrite(disk, diskWritingOffset, data) + try disk.close() + + // Update the progress + progress.completedUnitCount += Int64(diskLayer.size) + + return + } + var diskWritingOffset = diskWritingOffset // Pull and decompress a single layer into the specific offset on disk @@ -114,15 +112,7 @@ class DiskV2: Disk { return } - for chunk in data.chunks(ofCount: holeGranularityBytes) { - // Only write chunks that are not zero - if chunk != zeroChunk { - try disk.seek(toOffset: diskWritingOffset) - disk.write(chunk) - } - - diskWritingOffset += UInt64(chunk.count) - } + diskWritingOffset = try zeroSkippingWrite(disk, diskWritingOffset, data) } try await registry.pullBlob(diskLayer.digest) { data in @@ -141,4 +131,36 @@ class DiskV2: Disk { } } } + + private static func zeroSkippingWrite(_ disk: FileHandle, _ offset: UInt64, _ data: Data) throws -> UInt64 { + let holeGranularityBytes = 64 * 1024 + + // A zero chunk for faster than byte-by-byte comparisons + // + // Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative + // on the last block since it costs only 64 KiB of excess data per 500 MB layer. + // + // Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects): + // +--------------------------------------+---------------------------------------------------+ + // | Operation | time(1) result | + // +--------------------------------------+---------------------------------------------------+ + // | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total | + // | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total | + // +--------------------------------------+---------------------------------------------------+ + let zeroChunk = Data(count: holeGranularityBytes) + + var offset = offset + + for chunk in data.chunks(ofCount: holeGranularityBytes) { + // Only write chunks that are not zero + if chunk != zeroChunk { + try disk.seek(toOffset: offset) + disk.write(chunk) + } + + offset += UInt64(chunk.count) + } + + return offset + } } diff --git a/Sources/tart/VMDirectory+OCI.swift b/Sources/tart/VMDirectory+OCI.swift index 26850cfd..04e1922a 100644 --- a/Sources/tart/VMDirectory+OCI.swift +++ b/Sources/tart/VMDirectory+OCI.swift @@ -13,15 +13,7 @@ extension VMDirectory { private static let bufferSizeBytes = 64 * 1024 * 1024 private static let layerLimitBytes = 500 * 1000 * 1000 - func pullFromRegistry(registry: Registry, reference: String, concurrency: UInt) async throws { - defaultLogger.appendNewLine("pulling manifest...") - - let (manifest, _) = try await registry.pullManifest(reference: reference) - - return try await pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency) - } - - func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt) async throws { + func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws { // Pull VM's config file layer and re-serialize it into a config file let configLayers = manifest.layers.filter { $0.mediaType == configMediaType @@ -61,7 +53,8 @@ extension VMDirectory { let progress = Progress(totalUnitCount: diskCompressedSize) ProgressObserver(progress).log(defaultLogger) - try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress) + try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress, + localLayerCache: localLayerCache) // Pull VM's NVRAM file layer and store it in an NVRAM file defaultLogger.appendNewLine("pulling NVRAM...") diff --git a/Sources/tart/VMStorageOCI.swift b/Sources/tart/VMStorageOCI.swift index b098802e..664a3a1b 100644 --- a/Sources/tart/VMStorageOCI.swift +++ b/Sources/tart/VMStorageOCI.swift @@ -190,7 +190,16 @@ class VMStorageOCI: PrunableStorage { try await withTaskCancellationHandler(operation: { try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) { - try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency) + var localLayerCache: LocalLayerCache? = nil + + if name.reference.type == .Tag, + let vmDir = try? open(name), + let digest = try? digest(name), + let (manifest, _) = try? await registry.pullManifest(reference: digest) { + localLayerCache = try LocalLayerCache(vmDir.diskURL, manifest) + } + + try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache) } recoverFromFailure: { error in print("Error: \(error.localizedDescription)") print("Attempting to re-try...")