diff --git a/app/buck2_execute/src/execute/output.rs b/app/buck2_execute/src/execute/output.rs index a7b18fe5e67a7..e143ba5815436 100644 --- a/app/buck2_execute/src/execute/output.rs +++ b/app/buck2_execute/src/execute/output.rs @@ -14,10 +14,12 @@ use buck2_common::file_ops::FileDigest; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; use buck2_error::BuckErrorContext; use futures::future; +use remote_execution::InlinedBlobWithDigest; use remote_execution::TDigest; use crate::digest::CasDigestConversionResultExt; use crate::digest::CasDigestFromReExt; +use crate::digest::CasDigestToReExt; use crate::digest_config::DigestConfig; use crate::re::manager::ManagedRemoteExecutionClient; use crate::re::streams::RemoteCommandStdStreams; @@ -238,12 +240,13 @@ impl CommandStdStreams { self, client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, + digest_config: DigestConfig, ) -> buck2_error::Result> { match self { Self::Local { stdout, stderr } => { let (stdout, stderr) = future::try_join( - maybe_upload_to_re(client, use_case, stdout), - maybe_upload_to_re(client, use_case, stderr), + maybe_upload_to_re(client, use_case, stdout, digest_config), + maybe_upload_to_re(client, use_case, stderr, digest_config), ) .await?; @@ -277,11 +280,17 @@ async fn maybe_upload_to_re( client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, bytes: Vec, + digest_config: DigestConfig, ) -> buck2_error::Result { const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE if bytes.len() < MIN_STREAM_UPLOAD_SIZE { return Ok(ReStdStream::Raw(bytes)); } - let digest = client.upload_blob(bytes, use_case).await?; + let inline_blob = InlinedBlobWithDigest { + digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(), + blob: bytes, + ..Default::default() + }; + let digest = client.upload_blob(inline_blob, use_case).await?; Ok(ReStdStream::Digest(digest)) } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index 2169ec28edca3..89ee2e92d8a9c 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -297,7 +297,7 @@ impl RemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> buck2_error::Result { self.data @@ -1209,9 +1209,10 @@ impl RemoteExecutionClientImpl { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> buck2_error::Result { + let digest = blob.digest.clone(); with_error_handler( "upload_blob", self.get_session_id(), @@ -1219,7 +1220,8 @@ impl RemoteExecutionClientImpl { .upload_blob(blob, use_case.metadata(None)) .await, ) - .await + .await?; + Ok(digest) } async fn materialize_files( diff --git a/app/buck2_execute/src/re/manager.rs b/app/buck2_execute/src/re/manager.rs index 5fecf8b5ed38f..9e47f2178c311 100644 --- a/app/buck2_execute/src/re/manager.rs +++ b/app/buck2_execute/src/re/manager.rs @@ -468,7 +468,7 @@ impl ManagedRemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> buck2_error::Result { let use_case = self.re_use_case_override.unwrap_or(use_case); diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 1a4fe862ac716..cf9909d225e00 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -417,7 +417,7 @@ impl CacheUploader { .report .std_streams .clone() - .into_re(&self.re_client, self.re_use_case) + .into_re(&self.re_client, self.re_use_case, digest_config) .await .buck_error_context("Error accessing std_streams") }; diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index f1d860e624130..456929b164a1c 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -762,11 +762,21 @@ impl REClient { pub async fn upload_blob( &self, - _blob: Vec, - _metadata: RemoteExecutionMetadata, - ) -> anyhow::Result { - // TODO(aloiscochard) - Err(anyhow::anyhow!("Not implemented (RE upload_blob)")) + blob: InlinedBlobWithDigest, + metadata: RemoteExecutionMetadata, + ) -> anyhow::Result<()> { + self.upload( + metadata, + UploadRequest { + inlined_blobs_with_digest: Some(vec![blob]), + files_with_digest: None, + directories: None, + upload_only_missing: false, + ..Default::default() + }, + ) + .await?; + Ok(()) } pub async fn download(