Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend precompile to support a DAG #792

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
doctest = false

[dependencies]
async-trait = "0.1"
anyhow = { workspace = true }
chrono = { workspace = true }
containerd-shim = { workspace = true }
Expand Down
16 changes: 15 additions & 1 deletion crates/containerd-shim-wasm/src/container/engine.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use async_trait::async_trait;

Check warning on line 1 in crates/containerd-shim-wasm/src/container/engine.rs

View workflow job for this annotation

GitHub Actions / common / lint on ubuntu-latest

Diff in /home/runner/work/runwasi/runwasi/crates/containerd-shim-wasm/src/container/engine.rs
use std::collections::BTreeSet;
use std::fs::File;
use std::io::Read;

Check warning on line 5 in crates/containerd-shim-wasm/src/container/engine.rs

View workflow job for this annotation

GitHub Actions / common / lint on ubuntu-latest

Diff in /home/runner/work/runwasi/runwasi/crates/containerd-shim-wasm/src/container/engine.rs
use anyhow::{bail, Context, Result};

use super::Source;
use crate::container::{PathResolve, RuntimeContext};
use crate::sandbox::oci::WasmLayer;

#[async_trait]
pub trait Engine: Clone + Send + Sync + 'static {
/// The name to use for this engine
fn name() -> &'static str;
Expand Down Expand Up @@ -62,7 +65,7 @@
/// The cached, precompiled layers will be reloaded on subsequent runs.
/// The runtime is expected to return the same number of layers passed in, if the layer cannot be precompiled it should return `None` for that layer.
/// In some edge cases it is possible that the layers may already be precompiled and None should be returned in this case.
fn precompile(&self, _layers: &[WasmLayer]) -> Result<Vec<Option<Vec<u8>>>> {
async fn precompile(&self, _layers: &[WasmLayer]) -> Result<Vec<PrecompiledLayer>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this now provides a way to do more than just pre-compiling such as composing I wonder if we would want to rename it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that definitely seems reasonable! Happy to call this process_layers or whatever makes sense.

bail!("precompile not supported");
}

Expand All @@ -81,3 +84,14 @@
None
}
}

/// A `PrecompiledLayer` represents the precompiled bytes of a layer and the indices of dependency layers (if any) used to process it.
pub struct PrecompiledLayer {
/// The media type this layer represents.
pub media_type: String,
/// The bytes of the precompiled layer.
pub bytes: Vec<u8>,
/// Indices of this layers' parents.
/// TODO: use digest of parent layer
pub parents: BTreeSet<usize>,
}
2 changes: 1 addition & 1 deletion crates/containerd-shim-wasm/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod wasm;

pub(crate) use context::WasiContext;
pub use context::{Entrypoint, RuntimeContext, Source};
pub use engine::Engine;
pub use engine::{Engine, PrecompiledLayer};
pub use instance::Instance;
pub use path::PathResolve;
pub use wasm::WasmBinaryType;
Expand Down
232 changes: 146 additions & 86 deletions crates/containerd-shim-wasm/src/sandbox/containerd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;

use containerd_client;
use containerd_client::services::v1::containers_client::ContainersClient;
Expand All @@ -17,14 +18,14 @@ use containerd_client::tonic::transport::Channel;
use containerd_client::tonic::Streaming;
use containerd_client::{tonic, with_namespace};
use futures::TryStreamExt;
use oci_spec::image::{Arch, Digest, ImageManifest, MediaType, Platform};
use oci_spec::image::{Arch, DescriptorBuilder, Digest, ImageManifest, MediaType, Platform};
use sha256::digest;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Code, Request};

use super::lease::LeaseGuard;
use crate::container::Engine;
use crate::container::{Engine, PrecompiledLayer};
use crate::sandbox::error::{Error as ShimError, Result};
use crate::sandbox::oci::{self, WasmLayer};
use crate::with_lease;
Expand Down Expand Up @@ -406,32 +407,34 @@ impl Client {
.iter()
.filter(|x| is_wasm_layer(x.media_type(), T::supported_layers_types()));

let mut layers = vec![];
let mut all_layers = HashMap::new();
let media_type_label = precompile_label(T::name(), "media-type");
for original_config in configs {
let layer = self
.read_wasm_layer(
original_config,
can_precompile,
&precompile_id,
&mut needs_precompile,
)
.await?;
layers.push(layer);
self.read_wasm_layer(
original_config,
can_precompile,
&precompile_id,
&mut needs_precompile,
&media_type_label,
&mut all_layers,
)
.await?;
}

let layers = all_layers.values().cloned().collect::<Vec<_>>();

if layers.is_empty() {
log::info!("no WASM layers found in OCI image");
return Ok((vec![], platform));
}

if needs_precompile {
log::info!("precompiling layers for image: {}", container.image);
let compiled_layers = match engine.precompile(&layers) {
let compiled_layers = match engine.precompile(&layers).await {
Ok(compiled_layers) => {
if compiled_layers.len() != layers.len() {
return Err(ShimError::FailedPrecondition(
"precompile returned wrong number of layers".to_string(),
));
if compiled_layers.is_empty() {
log::info!("no precompiled layers returned");
return Ok((layers, platform));
}
compiled_layers
}
Expand All @@ -442,40 +445,46 @@ impl Client {
};

let mut layers_for_runtime = Vec::with_capacity(compiled_layers.len());
for (i, compiled_layer) in compiled_layers.iter().enumerate() {
if compiled_layer.is_none() {
log::debug!("no compiled layer using original");
layers_for_runtime.push(layers[i].clone());
continue;
}
for compiled_layer in compiled_layers.iter() {
let PrecompiledLayer {
media_type,
bytes,
parents,
} = compiled_layer;

let mut labels = HashMap::new();
let media_type_label = precompile_label(T::name(), "media-type");
labels.insert(media_type_label, media_type.clone());

let compiled_layer = compiled_layer.as_ref().unwrap();
let original_config = &layers[i].config;
let labels = HashMap::from([(
format!("{precompile_id}/original"),
original_config.digest().to_string(),
)]);
let precompiled_content = self
.save_content(compiled_layer.clone(), &precompile_id, labels)
.save_content(bytes.clone(), &precompile_id, labels)
.await?;

log::debug!(
"updating original layer {} with compiled layer {}",
original_config.digest(),
precompiled_content.digest
);
// We add two labels here:
// - one with cache key per engine instance
// - one with a gc ref flag so it doesn't get cleaned up as long as the original layer exists
let mut original_layer = self.get_info(original_config.digest()).await?;
original_layer
.labels
.insert(precompile_id.clone(), precompiled_content.digest.clone());
original_layer.labels.insert(
format!("containerd.io/gc.ref.content.precompile.{}", i),
precompiled_content.digest.clone(),
);
self.update_info(original_layer).await?;
// Update the original layers with a gc label which associates the original digests that
// were used to process and produce the new layer with the digest of the precompiled content.
for parent_idx in parents {
let parent_config = &layers[*parent_idx].config;
let mut parent_layer = self.get_info(parent_config.digest()).await?;

let child_digest = precompiled_content.digest.clone();

log::debug!(
"updating original layer {} with compiled layer {}",
parent_config.digest(),
child_digest,
);

let parent_label = format!("{precompile_id}/child.{child_digest}");
parent_layer
.labels
.insert(parent_label, child_digest.clone());

let gc_label =
format!("containerd.io/gc.ref.content.precompile.{child_digest}");
parent_layer.labels.insert(gc_label, child_digest.clone());

self.update_info(parent_layer).await?;
}

// The original image is considered a root object, by adding a ref to the new compiled content
// We tell containerd to not garbage collect the new content until this image is removed from the system
Expand All @@ -485,22 +494,35 @@ impl Client {
"updating image content with precompile digest to avoid garbage collection"
);
let mut image_content = self.get_info(&image_digest).await?;

image_content.labels.insert(
format!("containerd.io/gc.ref.content.precompile.{}", i),
precompiled_content.digest,
format!(
"containerd.io/gc.ref.content.precompile.{}",
precompiled_content.digest
),
precompiled_content.digest.clone(),
);
image_content
.labels
.insert(precompile_id.clone(), "true".to_string());
self.update_info(image_content).await?;

let precompiled_image_digest = Digest::from_str(&precompiled_content.digest)?;

let wasm_layer_descriptor = DescriptorBuilder::default()
.media_type(&**media_type)
.size(bytes.len() as u64)
.digest(precompiled_image_digest)
.build()?;

layers_for_runtime.push(WasmLayer {
config: original_config.clone(),
layer: compiled_layer.clone(),
config: wasm_layer_descriptor,
layer: bytes.clone(),
});

let _ = precompiled_content.lease.release().await;
}

return Ok((layers_for_runtime, platform));
};

Expand All @@ -515,44 +537,82 @@ impl Client {
can_precompile: bool,
precompile_id: &String,
needs_precompile: &mut bool,
) -> std::prelude::v1::Result<WasmLayer, ShimError> {
let mut digest_to_load = original_config.digest().clone();
if can_precompile {
let info = self.get_info(&digest_to_load).await?;
if let Some(label) = info.labels.get(precompile_id) {
// Safe to unwrap here since we already checked for the label's existence
digest_to_load = label.parse()?;
log::info!(
"layer {} has pre-compiled content: {} ",
info.digest,
&digest_to_load
);
media_type_label: &String,
all_layers: &mut HashMap<Digest, WasmLayer>,
) -> std::prelude::v1::Result<(), ShimError> {
let parent_digest = original_config.digest().clone();
let digests_to_load = if can_precompile {
let info = self.get_info(&parent_digest).await?;
let child_digests = info
.labels
.into_iter()
.filter_map(|(key, child_digest)| {
if key.starts_with(&format!("{precompile_id}/child")) {
log::debug!("layer {parent_digest} has child layer: {child_digest} ");
Some(child_digest)
} else {
None
}
})
.collect::<Vec<_>>();

if child_digests.is_empty() {
vec![parent_digest.clone()]
} else {
child_digests
.into_iter()
.map(|d| d.parse().map_err(ShimError::Oci))
.collect::<Result<Vec<Digest>>>()?
}
}
log::debug!("loading digest: {} ", &digest_to_load);
let res = self
.read_content(&digest_to_load)
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
});

match res {
Ok(res) => Ok(res),
Err(err) if digest_to_load == *original_config.digest() => Err(err),
Err(err) => {
log::error!("failed to load precompiled layer: {err}");
log::error!("falling back to original layer and marking for recompile");
*needs_precompile = can_precompile; // only mark for recompile if engine is capable
self.read_content(original_config.digest())
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
})
} else {
vec![parent_digest.clone()]
};

for digest_to_load in digests_to_load {
if all_layers.contains_key(&digest_to_load) {
log::debug!("layer {digest_to_load} already loaded");
continue;
}
log::debug!("loading digest: {digest_to_load}");

let info = self.get_info(&digest_to_load).await?;
let config_descriptor = match info.labels.get(media_type_label) {
Some(media_type) => DescriptorBuilder::default()
.media_type(&**media_type)
.size(info.size as u64)
.digest(digest_to_load.clone())
.build()?,
None => original_config.clone(),
};

let res = self
.read_content(&digest_to_load)
.await
.map(|module| WasmLayer {
config: config_descriptor,
layer: module,
});

let wasm_layer = match res {
Ok(res) => res,
Err(err) if digest_to_load == *original_config.digest() => return Err(err),
Err(err) => {
log::error!("failed to load precompiled layer: {err}");
log::error!("falling back to original layer and marking for recompile");
*needs_precompile = can_precompile; // only mark for recompile if engine is capable
self.read_content(original_config.digest())
.await
.map(|module| WasmLayer {
config: original_config.clone(),
layer: module,
})?
}
};

all_layers.insert(digest_to_load, wasm_layer);
}

Ok(())
}
}

Expand Down
Loading