diff --git a/app/buck2_execute/src/execute/output.rs b/app/buck2_execute/src/execute/output.rs index 2b156aba2a585..2ecbbbae900c6 100644 --- a/app/buck2_execute/src/execute/output.rs +++ b/app/buck2_execute/src/execute/output.rs @@ -14,10 +14,12 @@ use anyhow::Context; use buck2_common::file_ops::FileDigest; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; use futures::future; +use remote_execution::InlinedBlobWithDigest; use remote_execution::TDigest; use crate::digest::CasDigestConversionResultExt; use crate::digest::CasDigestFromReExt; +use crate::digest::CasDigestToReExt; use crate::digest_config::DigestConfig; use crate::re::manager::ManagedRemoteExecutionClient; use crate::re::streams::RemoteCommandStdStreams; @@ -238,12 +240,13 @@ impl CommandStdStreams { self, client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, + digest_config: DigestConfig, ) -> anyhow::Result> { match self { Self::Local { stdout, stderr } => { let (stdout, stderr) = future::try_join( - maybe_upload_to_re(client, use_case, stdout), - maybe_upload_to_re(client, use_case, stderr), + maybe_upload_to_re(client, use_case, stdout, digest_config), + maybe_upload_to_re(client, use_case, stderr, digest_config), ) .await?; @@ -276,11 +279,17 @@ async fn maybe_upload_to_re( client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, bytes: Vec, + digest_config: DigestConfig, ) -> anyhow::Result { const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE if bytes.len() < MIN_STREAM_UPLOAD_SIZE { return Ok(ReStdStream::Raw(bytes)); } - let digest = client.upload_blob(bytes, use_case).await?; + let inline_blob = InlinedBlobWithDigest { + digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(), + blob: bytes, + ..Default::default() + }; + let digest = client.upload_blob(inline_blob, use_case).await?; Ok(ReStdStream::Digest(digest)) } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index 534dca15562cc..edb085a4d75c1 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -278,7 +278,7 @@ impl RemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { self.data @@ -1132,9 +1132,10 @@ impl RemoteExecutionClientImpl { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { + let digest = blob.digest.clone(); with_error_handler( "upload_blob", self.get_session_id(), @@ -1142,7 +1143,8 @@ impl RemoteExecutionClientImpl { .upload_blob(blob, use_case.metadata(None)) .await, ) - .await + .await?; + Ok(digest) } async fn materialize_files( diff --git a/app/buck2_execute/src/re/manager.rs b/app/buck2_execute/src/re/manager.rs index 14a629825adb0..acdcf780e1b4f 100644 --- a/app/buck2_execute/src/re/manager.rs +++ b/app/buck2_execute/src/re/manager.rs @@ -465,7 +465,7 @@ impl ManagedRemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { let use_case = self.re_use_case_override.unwrap_or(use_case); diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 0fb1baaf1b1d1..553ba65fdda45 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -417,7 +417,7 @@ impl CacheUploader { .report .std_streams .clone() - .into_re(&self.re_client, self.re_use_case) + .into_re(&self.re_client, self.re_use_case, digest_config) .await .context("Error accessing std_streams") }; diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 40bf31fa1d885..69657ae90b366 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -760,11 +760,21 @@ impl REClient { pub async fn upload_blob( &self, - _blob: Vec, - _metadata: RemoteExecutionMetadata, - ) -> anyhow::Result { - // TODO(aloiscochard) - Err(anyhow::anyhow!("Not implemented (RE upload_blob)")) + blob: InlinedBlobWithDigest, + metadata: RemoteExecutionMetadata, + ) -> anyhow::Result<()> { + self.upload( + metadata, + UploadRequest { + inlined_blobs_with_digest: Some(vec![blob]), + files_with_digest: None, + directories: None, + upload_only_missing: false, + ..Default::default() + }, + ) + .await?; + Ok(()) } pub async fn download(