From e551e6e7c9d181768ef5ee7c21f64242fc321231 Mon Sep 17 00:00:00 2001 From: Brian H Date: Mon, 6 Jan 2025 12:57:50 -0700 Subject: [PATCH] Extend precompile to support a DAG Signed-off-by: Brian H --- Cargo.lock | 1 + crates/containerd-shim-wasm/Cargo.toml | 1 + .../src/container/engine.rs | 16 +- .../containerd-shim-wasm/src/container/mod.rs | 2 +- .../src/sandbox/containerd/client.rs | 232 +++++++++++------- 5 files changed, 164 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9ffa1233..4774fe8c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -860,6 +860,7 @@ name = "containerd-shim-wasm" version = "0.8.0" dependencies = [ "anyhow", + "async-trait", "caps", "chrono", "containerd-client", diff --git a/crates/containerd-shim-wasm/Cargo.toml b/crates/containerd-shim-wasm/Cargo.toml index 7be2866eb..1de03529a 100644 --- a/crates/containerd-shim-wasm/Cargo.toml +++ b/crates/containerd-shim-wasm/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true doctest = false [dependencies] +async-trait = "0.1" anyhow = { workspace = true } chrono = { workspace = true } containerd-shim = { workspace = true } diff --git a/crates/containerd-shim-wasm/src/container/engine.rs b/crates/containerd-shim-wasm/src/container/engine.rs index 0cbcec7ad..783c5bcab 100644 --- a/crates/containerd-shim-wasm/src/container/engine.rs +++ b/crates/containerd-shim-wasm/src/container/engine.rs @@ -1,3 +1,5 @@ +use async_trait::async_trait; +use std::collections::BTreeSet; use std::fs::File; use std::io::Read; @@ -7,6 +9,7 @@ use super::Source; use crate::container::{PathResolve, RuntimeContext}; use crate::sandbox::oci::WasmLayer; +#[async_trait] pub trait Engine: Clone + Send + Sync + 'static { /// The name to use for this engine fn name() -> &'static str; @@ -62,7 +65,7 @@ pub trait Engine: Clone + Send + Sync + 'static { /// The cached, precompiled layers will be reloaded on subsequent runs. /// The runtime is expected to return the same number of layers passed in, if the layer cannot be precompiled it should return `None` for that layer. /// In some edge cases it is possible that the layers may already be precompiled and None should be returned in this case. - fn precompile(&self, _layers: &[WasmLayer]) -> Result>>> { + async fn precompile(&self, _layers: &[WasmLayer]) -> Result> { bail!("precompile not supported"); } @@ -81,3 +84,14 @@ pub trait Engine: Clone + Send + Sync + 'static { None } } + +/// A `PrecompiledLayer` represents the precompiled bytes of a layer and the indices of dependency layers (if any) used to process it. +pub struct PrecompiledLayer { + /// The media type this layer represents. + pub media_type: String, + /// The bytes of the precompiled layer. + pub bytes: Vec, + /// Indices of this layers' parents. + /// TODO: use digest of parent layer + pub parents: BTreeSet, +} diff --git a/crates/containerd-shim-wasm/src/container/mod.rs b/crates/containerd-shim-wasm/src/container/mod.rs index c78787d99..8c1b16067 100644 --- a/crates/containerd-shim-wasm/src/container/mod.rs +++ b/crates/containerd-shim-wasm/src/container/mod.rs @@ -17,7 +17,7 @@ mod wasm; pub(crate) use context::WasiContext; pub use context::{Entrypoint, RuntimeContext, Source}; -pub use engine::Engine; +pub use engine::{Engine, PrecompiledLayer}; pub use instance::Instance; pub use path::PathResolve; pub use wasm::WasmBinaryType; diff --git a/crates/containerd-shim-wasm/src/sandbox/containerd/client.rs b/crates/containerd-shim-wasm/src/sandbox/containerd/client.rs index 97c61cdf0..3e6c4cc2e 100644 --- a/crates/containerd-shim-wasm/src/sandbox/containerd/client.rs +++ b/crates/containerd-shim-wasm/src/sandbox/containerd/client.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::path::Path; +use std::str::FromStr; use containerd_client; use containerd_client::services::v1::containers_client::ContainersClient; @@ -17,14 +18,14 @@ use containerd_client::tonic::transport::Channel; use containerd_client::tonic::Streaming; use containerd_client::{tonic, with_namespace}; use futures::TryStreamExt; -use oci_spec::image::{Arch, Digest, ImageManifest, MediaType, Platform}; +use oci_spec::image::{Arch, DescriptorBuilder, Digest, ImageManifest, MediaType, Platform}; use sha256::digest; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Code, Request}; use super::lease::LeaseGuard; -use crate::container::Engine; +use crate::container::{Engine, PrecompiledLayer}; use crate::sandbox::error::{Error as ShimError, Result}; use crate::sandbox::oci::{self, WasmLayer}; use crate::with_lease; @@ -406,19 +407,22 @@ impl Client { .iter() .filter(|x| is_wasm_layer(x.media_type(), T::supported_layers_types())); - let mut layers = vec![]; + let mut all_layers = HashMap::new(); + let media_type_label = precompile_label(T::name(), "media-type"); for original_config in configs { - let layer = self - .read_wasm_layer( - original_config, - can_precompile, - &precompile_id, - &mut needs_precompile, - ) - .await?; - layers.push(layer); + self.read_wasm_layer( + original_config, + can_precompile, + &precompile_id, + &mut needs_precompile, + &media_type_label, + &mut all_layers, + ) + .await?; } + let layers = all_layers.values().cloned().collect::>(); + if layers.is_empty() { log::info!("no WASM layers found in OCI image"); return Ok((vec![], platform)); @@ -426,12 +430,11 @@ impl Client { if needs_precompile { log::info!("precompiling layers for image: {}", container.image); - let compiled_layers = match engine.precompile(&layers) { + let compiled_layers = match engine.precompile(&layers).await { Ok(compiled_layers) => { - if compiled_layers.len() != layers.len() { - return Err(ShimError::FailedPrecondition( - "precompile returned wrong number of layers".to_string(), - )); + if compiled_layers.is_empty() { + log::info!("no precompiled layers returned"); + return Ok((layers, platform)); } compiled_layers } @@ -442,40 +445,46 @@ impl Client { }; let mut layers_for_runtime = Vec::with_capacity(compiled_layers.len()); - for (i, compiled_layer) in compiled_layers.iter().enumerate() { - if compiled_layer.is_none() { - log::debug!("no compiled layer using original"); - layers_for_runtime.push(layers[i].clone()); - continue; - } + for compiled_layer in compiled_layers.iter() { + let PrecompiledLayer { + media_type, + bytes, + parents, + } = compiled_layer; + + let mut labels = HashMap::new(); + let media_type_label = precompile_label(T::name(), "media-type"); + labels.insert(media_type_label, media_type.clone()); - let compiled_layer = compiled_layer.as_ref().unwrap(); - let original_config = &layers[i].config; - let labels = HashMap::from([( - format!("{precompile_id}/original"), - original_config.digest().to_string(), - )]); let precompiled_content = self - .save_content(compiled_layer.clone(), &precompile_id, labels) + .save_content(bytes.clone(), &precompile_id, labels) .await?; - log::debug!( - "updating original layer {} with compiled layer {}", - original_config.digest(), - precompiled_content.digest - ); - // We add two labels here: - // - one with cache key per engine instance - // - one with a gc ref flag so it doesn't get cleaned up as long as the original layer exists - let mut original_layer = self.get_info(original_config.digest()).await?; - original_layer - .labels - .insert(precompile_id.clone(), precompiled_content.digest.clone()); - original_layer.labels.insert( - format!("containerd.io/gc.ref.content.precompile.{}", i), - precompiled_content.digest.clone(), - ); - self.update_info(original_layer).await?; + // Update the original layers with a gc label which associates the original digests that + // were used to process and produce the new layer with the digest of the precompiled content. + for parent_idx in parents { + let parent_config = &layers[*parent_idx].config; + let mut parent_layer = self.get_info(parent_config.digest()).await?; + + let child_digest = precompiled_content.digest.clone(); + + log::debug!( + "updating original layer {} with compiled layer {}", + parent_config.digest(), + child_digest, + ); + + let parent_label = format!("{precompile_id}/child.{child_digest}"); + parent_layer + .labels + .insert(parent_label, child_digest.clone()); + + let gc_label = + format!("containerd.io/gc.ref.content.precompile.{child_digest}"); + parent_layer.labels.insert(gc_label, child_digest.clone()); + + self.update_info(parent_layer).await?; + } // The original image is considered a root object, by adding a ref to the new compiled content // We tell containerd to not garbage collect the new content until this image is removed from the system @@ -485,22 +494,35 @@ impl Client { "updating image content with precompile digest to avoid garbage collection" ); let mut image_content = self.get_info(&image_digest).await?; + image_content.labels.insert( - format!("containerd.io/gc.ref.content.precompile.{}", i), - precompiled_content.digest, + format!( + "containerd.io/gc.ref.content.precompile.{}", + precompiled_content.digest + ), + precompiled_content.digest.clone(), ); image_content .labels .insert(precompile_id.clone(), "true".to_string()); self.update_info(image_content).await?; + let precompiled_image_digest = Digest::from_str(&precompiled_content.digest)?; + + let wasm_layer_descriptor = DescriptorBuilder::default() + .media_type(&**media_type) + .size(bytes.len() as u64) + .digest(precompiled_image_digest) + .build()?; + layers_for_runtime.push(WasmLayer { - config: original_config.clone(), - layer: compiled_layer.clone(), + config: wasm_layer_descriptor, + layer: bytes.clone(), }); let _ = precompiled_content.lease.release().await; } + return Ok((layers_for_runtime, platform)); }; @@ -515,44 +537,82 @@ impl Client { can_precompile: bool, precompile_id: &String, needs_precompile: &mut bool, - ) -> std::prelude::v1::Result { - let mut digest_to_load = original_config.digest().clone(); - if can_precompile { - let info = self.get_info(&digest_to_load).await?; - if let Some(label) = info.labels.get(precompile_id) { - // Safe to unwrap here since we already checked for the label's existence - digest_to_load = label.parse()?; - log::info!( - "layer {} has pre-compiled content: {} ", - info.digest, - &digest_to_load - ); + media_type_label: &String, + all_layers: &mut HashMap, + ) -> std::prelude::v1::Result<(), ShimError> { + let parent_digest = original_config.digest().clone(); + let digests_to_load = if can_precompile { + let info = self.get_info(&parent_digest).await?; + let child_digests = info + .labels + .into_iter() + .filter_map(|(key, child_digest)| { + if key.starts_with(&format!("{precompile_id}/child")) { + log::debug!("layer {parent_digest} has child layer: {child_digest} "); + Some(child_digest) + } else { + None + } + }) + .collect::>(); + + if child_digests.is_empty() { + vec![parent_digest.clone()] + } else { + child_digests + .into_iter() + .map(|d| d.parse().map_err(ShimError::Oci)) + .collect::>>()? } - } - log::debug!("loading digest: {} ", &digest_to_load); - let res = self - .read_content(&digest_to_load) - .await - .map(|module| WasmLayer { - config: original_config.clone(), - layer: module, - }); - - match res { - Ok(res) => Ok(res), - Err(err) if digest_to_load == *original_config.digest() => Err(err), - Err(err) => { - log::error!("failed to load precompiled layer: {err}"); - log::error!("falling back to original layer and marking for recompile"); - *needs_precompile = can_precompile; // only mark for recompile if engine is capable - self.read_content(original_config.digest()) - .await - .map(|module| WasmLayer { - config: original_config.clone(), - layer: module, - }) + } else { + vec![parent_digest.clone()] + }; + + for digest_to_load in digests_to_load { + if all_layers.contains_key(&digest_to_load) { + log::debug!("layer {digest_to_load} already loaded"); + continue; } + log::debug!("loading digest: {digest_to_load}"); + + let info = self.get_info(&digest_to_load).await?; + let config_descriptor = match info.labels.get(media_type_label) { + Some(media_type) => DescriptorBuilder::default() + .media_type(&**media_type) + .size(info.size as u64) + .digest(digest_to_load.clone()) + .build()?, + None => original_config.clone(), + }; + + let res = self + .read_content(&digest_to_load) + .await + .map(|module| WasmLayer { + config: config_descriptor, + layer: module, + }); + + let wasm_layer = match res { + Ok(res) => res, + Err(err) if digest_to_load == *original_config.digest() => return Err(err), + Err(err) => { + log::error!("failed to load precompiled layer: {err}"); + log::error!("falling back to original layer and marking for recompile"); + *needs_precompile = can_precompile; // only mark for recompile if engine is capable + self.read_content(original_config.digest()) + .await + .map(|module| WasmLayer { + config: original_config.clone(), + layer: module, + })? + } + }; + + all_layers.insert(digest_to_load, wasm_layer); } + + Ok(()) } }