From 37ee252f3744abc4511f55b5089cc52abd3ba09d Mon Sep 17 00:00:00 2001
From: Googler <noreply@google.com>
Date: Thu, 28 Jan 2021 22:54:20 -0800
Subject: [PATCH] Remote: Use parameters instead of thread-local storage to
 provide tracing metadata. (Part 4)

Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354472775
---
 .../ByteStreamBuildEventArtifactUploader.java |  29 +-
 ...reamBuildEventArtifactUploaderFactory.java |  13 +-
 .../build/lib/remote/ByteStreamUploader.java  |  53 ++-
 .../build/lib/remote/GrpcCacheClient.java     |  12 +-
 .../build/lib/remote/RemoteCache.java         |   7 +-
 .../lib/remote/RemoteExecutionCache.java      |  21 +-
 .../build/lib/remote/RemoteModule.java        |   6 +-
 .../RemoteRepositoryRemoteExecutor.java       |   3 +-
 .../build/lib/remote/RemoteSpawnRunner.java   |   3 +-
 .../lib/remote/common/RemoteCacheClient.java  |   7 +-
 .../remote/disk/DiskAndRemoteCacheClient.java |  14 +-
 .../lib/remote/disk/DiskCacheClient.java      |   6 +-
 .../lib/remote/http/HttpCacheClient.java      |   6 +-
 ...eStreamBuildEventArtifactUploaderTest.java |   7 +-
 .../lib/remote/ByteStreamUploaderTest.java    | 312 +++++++++---------
 .../build/lib/remote/GrpcCacheClientTest.java |   2 +-
 .../build/lib/remote/RemoteCacheTests.java    |  69 ++--
 .../RemoteRepositoryRemoteExecutorTest.java   |   2 +-
 .../lib/remote/RemoteSpawnRunnerTest.java     |   2 +-
 .../downloader/GrpcRemoteDownloaderTest.java  |  22 +-
 .../lib/remote/http/HttpCacheClientTest.java  |  23 +-
 .../lib/remote/util/InMemoryCacheClient.java  |   6 +-
 .../build/remote/worker/ByteStreamServer.java |   6 +-
 .../build/remote/worker/CasServer.java        |   7 +-
 .../remote/worker/OnDiskBlobStoreCache.java   |  10 +-
 25 files changed, 375 insertions(+), 273 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index 1b2fd7c4a539c5..0ca33b6bd4c8ed 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.remote;
 
 import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -29,7 +30,11 @@
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
 import com.google.devtools.build.lib.collect.ImmutableIterable;
 import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.vfs.Path;
 import io.grpc.Context;
 import io.netty.util.AbstractReferenceCounted;
@@ -50,7 +55,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
     implements BuildEventArtifactUploader {
 
   private final ListeningExecutorService uploadExecutor;
-  private final Context ctx;
+  private final String buildRequestId;
+  private final String commandId;
   private final ByteStreamUploader uploader;
   private final String remoteServerInstanceName;
   private final MissingDigestsFinder missingDigestsFinder;
@@ -61,7 +67,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
       ByteStreamUploader uploader,
       MissingDigestsFinder missingDigestsFinder,
       String remoteServerName,
-      Context ctx,
+      String buildRequestId,
+      String commandId,
       @Nullable String remoteInstanceName,
       int maxUploadThreads) {
     this.uploader = Preconditions.checkNotNull(uploader);
@@ -69,7 +76,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
     if (!Strings.isNullOrEmpty(remoteInstanceName)) {
       remoteServerInstanceName += "/" + remoteInstanceName;
     }
-    this.ctx = ctx;
+    this.buildRequestId = buildRequestId;
+    this.commandId = commandId;
     this.remoteServerInstanceName = remoteServerInstanceName;
     // Limit the maximum threads number to 1000 (chosen arbitrarily)
     this.uploadExecutor =
@@ -153,6 +161,8 @@ private static List<PathMetadata> processQueryResult(
    */
   private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
       ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
+    Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload");
+
     List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
     List<PathMetadata> filesToQuery = new ArrayList<>();
     Set<Digest> digestsToQuery = new HashSet<>();
@@ -185,6 +195,11 @@ private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
    */
   private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
       ImmutableIterable<PathMetadata> allPaths) {
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
+    RemoteActionExecutionContext context =
+        new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+
     ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
         ImmutableList.builder();
     for (PathMetadata path : allPaths) {
@@ -192,12 +207,8 @@ private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
         Chunker chunker =
             Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
         final ListenableFuture<Void> upload;
-        Context prevCtx = ctx.attach();
-        try {
-          upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false);
-        } finally {
-          ctx.detach(prevCtx);
-        }
+        upload =
+            uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
         allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
       } else {
         allPathsUploaded.add(Futures.immediateFuture(path));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
index 978c10f325e50e..1c27fbe703d0c7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java
@@ -18,7 +18,6 @@
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
 import com.google.devtools.build.lib.runtime.CommandEnvironment;
-import io.grpc.Context;
 import javax.annotation.Nullable;
 
 /**
@@ -29,7 +28,8 @@ class ByteStreamBuildEventArtifactUploaderFactory implements
 
   private final ByteStreamUploader uploader;
   private final String remoteServerName;
-  private final Context ctx;
+  private final String buildRequestId;
+  private final String commandId;
   private final MissingDigestsFinder missingDigestsFinder;
   @Nullable private final String remoteInstanceName;
 
@@ -37,12 +37,14 @@ class ByteStreamBuildEventArtifactUploaderFactory implements
       ByteStreamUploader uploader,
       MissingDigestsFinder missingDigestsFinder,
       String remoteServerName,
-      Context ctx,
+      String buildRequestId,
+      String commandId,
       @Nullable String remoteInstanceName) {
     this.uploader = uploader;
     this.missingDigestsFinder = missingDigestsFinder;
     this.remoteServerName = remoteServerName;
-    this.ctx = ctx;
+    this.buildRequestId = buildRequestId;
+    this.commandId = commandId;
     this.remoteInstanceName = remoteInstanceName;
   }
 
@@ -52,7 +54,8 @@ public BuildEventArtifactUploader create(CommandEnvironment env) {
         uploader.retain(),
         missingDigestsFinder,
         remoteServerName,
-        ctx,
+        buildRequestId,
+        commandId,
         remoteInstanceName,
         env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads);
   }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index c1133796129b4b..bba8593ed91050 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -36,12 +36,12 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.remote.util.Utils;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
-import io.grpc.Context;
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.Status.Code;
@@ -134,9 +134,10 @@ public ByteStreamUploader(
    *     uploaded, if {@code true} the blob is uploaded.
    * @throws IOException when reading of the {@link Chunker}s input source fails
    */
-  public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
+  public void uploadBlob(
+      RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload)
       throws IOException, InterruptedException {
-    uploadBlobs(singletonMap(hash, chunker), forceUpload);
+    uploadBlobs(context, singletonMap(hash, chunker), forceUpload);
   }
 
   /**
@@ -156,12 +157,14 @@ public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
    *     uploaded, if {@code true} the blob is uploaded.
    * @throws IOException when reading of the {@link Chunker}s input source or uploading fails
    */
-  public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
+  public void uploadBlobs(
+      RemoteActionExecutionContext context, Map<HashCode, Chunker> chunkers, boolean forceUpload)
       throws IOException, InterruptedException {
     List<ListenableFuture<Void>> uploads = new ArrayList<>();
 
     for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) {
-      uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
+      uploads.add(
+          uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
     }
 
     try {
@@ -200,14 +203,17 @@ void shutdown() {
     }
   }
 
-  /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
+  /**
+   * @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker,
+   *     boolean)} instead.
+   */
   @Deprecated
   @VisibleForTesting
   public ListenableFuture<Void> uploadBlobAsync(
-      HashCode hash, Chunker chunker, boolean forceUpload) {
+      RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) {
     Digest digest =
         Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
-    return uploadBlobAsync(digest, chunker, forceUpload);
+    return uploadBlobAsync(context, digest, chunker, forceUpload);
   }
 
   /**
@@ -227,7 +233,7 @@ public ListenableFuture<Void> uploadBlobAsync(
    * @throws IOException when reading of the {@link Chunker}s input source fails
    */
   public ListenableFuture<Void> uploadBlobAsync(
-      Digest digest, Chunker chunker, boolean forceUpload) {
+      RemoteActionExecutionContext context, Digest digest, Chunker chunker, boolean forceUpload) {
     synchronized (lock) {
       checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
 
@@ -242,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(
 
       ListenableFuture<Void> uploadResult =
           Futures.transform(
-              startAsyncUpload(digest, chunker),
+              startAsyncUpload(context, digest, chunker),
               (v) -> {
                 synchronized (lock) {
                   uploadedBlobs.add(HashCode.fromString(digest.getHash()));
@@ -294,7 +300,8 @@ private static String buildUploadResourceName(String instanceName, UUID uuid, Di
   }
 
   /** Starts a file upload and returns a future representing the upload. */
-  private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
+  private ListenableFuture<Void> startAsyncUpload(
+      RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
     try {
       chunker.reset();
     } catch (IOException e) {
@@ -313,7 +320,13 @@ private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker)
     String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
     AsyncUpload newUpload =
         new AsyncUpload(
-            channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
+            context,
+            channel,
+            callCredentialsProvider,
+            callTimeoutSecs,
+            retrier,
+            resourceName,
+            chunker);
     ListenableFuture<Void> currUpload = newUpload.start();
     currUpload.addListener(
         () -> {
@@ -348,6 +361,7 @@ public ReferenceCounted touch(Object o) {
 
   private static class AsyncUpload {
 
+    private final RemoteActionExecutionContext context;
     private final Channel channel;
     private final CallCredentialsProvider callCredentialsProvider;
     private final long callTimeoutSecs;
@@ -358,12 +372,14 @@ private static class AsyncUpload {
     private ClientCall<WriteRequest, WriteResponse> call;
 
     AsyncUpload(
+        RemoteActionExecutionContext context,
         Channel channel,
         CallCredentialsProvider callCredentialsProvider,
         long callTimeoutSecs,
         Retrier retrier,
         String resourceName,
         Chunker chunker) {
+      this.context = context;
       this.channel = channel;
       this.callCredentialsProvider = callCredentialsProvider;
       this.callTimeoutSecs = callTimeoutSecs;
@@ -373,7 +389,6 @@ private static class AsyncUpload {
     }
 
     ListenableFuture<Void> start() {
-      Context ctx = Context.current();
       ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
       AtomicLong committedOffset = new AtomicLong(0);
 
@@ -383,8 +398,7 @@ ListenableFuture<Void> start() {
                   retrier.executeAsync(
                       () -> {
                         if (committedOffset.get() < chunker.getSize()) {
-                          return ctx.call(
-                              () -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
+                          return callAndQueryOnFailure(committedOffset, progressiveBackoff);
                         }
                         return Futures.immediateFuture(null);
                       },
@@ -409,7 +423,8 @@ ListenableFuture<Void> start() {
 
     private ByteStreamFutureStub bsFutureStub() {
       return ByteStreamGrpc.newFutureStub(channel)
-          .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
+          .withInterceptors(
+              TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
           .withCallCredentials(callCredentialsProvider.getCallCredentials())
           .withDeadlineAfter(callTimeoutSecs, SECONDS);
     }
@@ -420,7 +435,7 @@ private ListenableFuture<Void> callAndQueryOnFailure(
           call(committedOffset),
           Exception.class,
           (e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff),
-          Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
+          MoreExecutors.directExecutor());
     }
 
     private ListenableFuture<Void> guardQueryWithSuppression(
@@ -584,7 +599,9 @@ public void onReady() {
               }
             }
           };
-      call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
+      call.start(
+          callListener,
+          TracingMetadataUtils.headersFromRequestMetadata(context.getRequestMetadata()));
       call.request(1);
       return uploadResult;
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
index ef031d7d83771a..b2aeb4a8b0ef3a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java
@@ -393,16 +393,22 @@ public void onCompleted() {
   }
 
   @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path path) {
     return uploader.uploadBlobAsync(
+        context,
         digest,
         Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
         /* forceUpload= */ true);
   }
 
   @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
     return uploader.uploadBlobAsync(
-        digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true);
+        context,
+        digest,
+        Chunker.builder().setInput(data.toByteArray()).build(),
+        /* forceUpload= */ true);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 3950814d09f253..3496243044c913 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -139,7 +139,7 @@ public ActionResult upload(
       int exitCode)
       throws ExecException, IOException, InterruptedException {
     ActionResult.Builder resultBuilder = ActionResult.newBuilder();
-    uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
+    uploadOutputs(context, execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
     resultBuilder.setExitCode(exitCode);
     ActionResult result = resultBuilder.build();
     if (exitCode == 0 && !action.getDoNotCache()) {
@@ -162,6 +162,7 @@ public ActionResult upload(
   }
 
   private void uploadOutputs(
+      RemoteActionExecutionContext context,
       Path execRoot,
       ActionKey actionKey,
       Action action,
@@ -192,14 +193,14 @@ private void uploadOutputs(
     for (Digest digest : digestsToUpload) {
       Path file = digestToFile.get(digest);
       if (file != null) {
-        uploads.add(cacheProtocol.uploadFile(digest, file));
+        uploads.add(cacheProtocol.uploadFile(context, digest, file));
       } else {
         ByteString blob = digestToBlobs.get(digest);
         if (blob == null) {
           String message = "FindMissingBlobs call returned an unknown digest: " + digest;
           throw new IOException(message);
         }
-        uploads.add(cacheProtocol.uploadBlob(digest, blob));
+        uploads.add(cacheProtocol.uploadBlob(context, digest, blob));
       }
     }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
index 88860253067e3e..439840492abddc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
 import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
@@ -53,7 +54,10 @@ public RemoteExecutionCache(
    * However, remote execution uses a cache to store input files, and that may be a separate
    * end-point from the executor itself, so the functionality lives here.
    */
-  public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> additionalInputs)
+  public void ensureInputsPresent(
+      RemoteActionExecutionContext context,
+      MerkleTree merkleTree,
+      Map<Digest, Message> additionalInputs)
       throws IOException, InterruptedException {
     Iterable<Digest> allDigests =
         Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
@@ -62,30 +66,33 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
 
     List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
     for (Digest missingDigest : missingDigests) {
-      uploadFutures.add(uploadBlob(missingDigest, merkleTree, additionalInputs));
+      uploadFutures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs));
     }
 
     waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
   }
 
   private ListenableFuture<Void> uploadBlob(
-      Digest digest, MerkleTree merkleTree, Map<Digest, Message> additionalInputs) {
+      RemoteActionExecutionContext context,
+      Digest digest,
+      MerkleTree merkleTree,
+      Map<Digest, Message> additionalInputs) {
     Directory node = merkleTree.getDirectoryByDigest(digest);
     if (node != null) {
-      return cacheProtocol.uploadBlob(digest, node.toByteString());
+      return cacheProtocol.uploadBlob(context, digest, node.toByteString());
     }
 
     PathOrBytes file = merkleTree.getFileByDigest(digest);
     if (file != null) {
       if (file.getBytes() != null) {
-        return cacheProtocol.uploadBlob(digest, file.getBytes());
+        return cacheProtocol.uploadBlob(context, digest, file.getBytes());
       }
-      return cacheProtocol.uploadFile(digest, file.getPath());
+      return cacheProtocol.uploadFile(context, digest, file.getPath());
     }
 
     Message message = additionalInputs.get(digest);
     if (message != null) {
-      return cacheProtocol.uploadBlob(digest, message.toByteString());
+      return cacheProtocol.uploadBlob(context, digest, message.toByteString());
     }
 
     return Futures.immediateFailedFuture(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index dfa368bb7768d8..443d4a8f95e26a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -93,7 +93,6 @@
 import com.google.devtools.common.options.OptionsParsingResult;
 import io.grpc.CallCredentials;
 import io.grpc.ClientInterceptor;
-import io.grpc.Context;
 import io.grpc.ManagedChannel;
 import java.io.IOException;
 import java.util.HashSet;
@@ -501,14 +500,13 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
             digestUtil,
             uploader.retain());
     uploader.release();
-    Context requestContext =
-        TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "bes-upload");
     buildEventArtifactUploaderFactoryDelegate.init(
         new ByteStreamBuildEventArtifactUploaderFactory(
             uploader,
             cacheClient,
             cacheChannel.authority(),
-            requestContext,
+            buildRequestId,
+            invocationId,
             remoteOptions.remoteInstanceName));
 
     if (enableRemoteExecution) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
index 5d89161722f913..f4a05dc30a831a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java
@@ -144,7 +144,8 @@ public ExecutionResult execute(
           additionalInputs.put(actionDigest, action);
           additionalInputs.put(commandHash, command);
 
-          remoteCache.ensureInputsPresent(merkleTree, additionalInputs);
+          remoteCache.ensureInputsPresent(
+              remoteActionExecutionContext, merkleTree, additionalInputs);
         }
 
         try (SilentCloseable c =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fa99c638b09359..3e9486c9b34907 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -337,7 +337,8 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
                 additionalInputs.put(commandHash, command);
                 Duration networkTimeStart = networkTime.getDuration();
                 Stopwatch uploadTime = Stopwatch.createStarted();
-                remoteCache.ensureInputsPresent(merkleTree, additionalInputs);
+                remoteCache.ensureInputsPresent(
+                    remoteActionExecutionContext, merkleTree, additionalInputs);
                 // subtract network time consumed here to ensure wall clock during upload is not
                 // double
                 // counted, and metrics time computation does not exceed total time
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index caf497f9df693c..628d214bec973b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -104,20 +104,23 @@ ListenableFuture<Void> downloadBlob(
   /**
    * Uploads a {@code file} to the CAS.
    *
+   * @param context the context for the action.
    * @param digest The digest of the file.
    * @param file The file to upload.
    * @return A future representing pending completion of the upload.
    */
-  ListenableFuture<Void> uploadFile(Digest digest, Path file);
+  ListenableFuture<Void> uploadFile(RemoteActionExecutionContext context, Digest digest, Path file);
 
   /**
    * Uploads a BLOB to the CAS.
    *
+   * @param context the context for the action.
    * @param digest The digest of the blob.
    * @param data The BLOB to upload.
    * @return A future representing pending completion of the upload.
    */
-  ListenableFuture<Void> uploadBlob(Digest digest, ByteString data);
+  ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data);
 
   /** Close resources associated with the remote cache. */
   void close();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
index 040a7d447836c2..60c501677d0cd9 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
@@ -65,11 +65,12 @@ public void close() {
   }
 
   @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path file) {
     try {
-      diskCache.uploadFile(digest, file).get();
+      diskCache.uploadFile(context, digest, file).get();
       if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) {
-        remoteCache.uploadFile(digest, file).get();
+        remoteCache.uploadFile(context, digest, file).get();
       }
     } catch (ExecutionException e) {
       return Futures.immediateFailedFuture(e.getCause());
@@ -80,11 +81,12 @@ public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
   }
 
   @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
     try {
-      diskCache.uploadBlob(digest, data).get();
+      diskCache.uploadBlob(context, digest, data).get();
       if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) {
-        remoteCache.uploadBlob(digest, data).get();
+        remoteCache.uploadBlob(context, digest, data).get();
       }
     } catch (ExecutionException e) {
       return Futures.immediateFailedFuture(e.getCause());
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
index 027891353cfe4c..6778525ccca254 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java
@@ -121,7 +121,8 @@ public void uploadActionResult(
   public void close() {}
 
   @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path file) {
     try (InputStream in = file.getInputStream()) {
       saveFile(digest.getHash(), in, /* actionResult= */ false);
     } catch (IOException e) {
@@ -131,7 +132,8 @@ public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
   }
 
   @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
     try (InputStream in = data.newInput()) {
       saveFile(digest.getHash(), in, /* actionResult= */ false);
     } catch (IOException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
index 7a67bb5bdb696e..1d8a922299abab 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java
@@ -671,7 +671,8 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture<V
   }
 
   @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path file) {
     try {
       return uploadAsync(
           digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true);
@@ -682,7 +683,8 @@ public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
   }
 
   @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
     return uploadAsync(
         digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true);
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index 704bb21f13667d..165924a205d81c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -334,7 +334,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception
     StaticMissingDigestsFinder digestQuerier =
         Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest)));
     ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class);
-    when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean()))
+    when(uploader.uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean()))
         .thenReturn(Futures.immediateFuture(null));
     ByteStreamBuildEventArtifactUploader artifactUploader =
         newArtifactUploader(uploader, digestQuerier);
@@ -350,7 +350,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception
 
     // assert
     verify(digestQuerier).findMissingDigests(any());
-    verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean());
+    verify(uploader).uploadBlobAsync(any(), eq(localDigest), any(), anyBoolean());
     assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash());
     assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash());
   }
@@ -374,7 +374,8 @@ private ByteStreamBuildEventArtifactUploader newArtifactUploader(
         uploader,
         missingDigestsFinder,
         "localhost",
-        withEmptyMetadata,
+        "none",
+        "none",
         "instance",
         /* maxUploadThreads= */ 100);
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index eec073a50a197b..29cd41eb4ef88f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -36,6 +36,9 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
 import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TestUtils;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -86,9 +89,7 @@
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-/**
- * Tests for {@link ByteStreamUploader}.
- */
+/** Tests for {@link ByteStreamUploader}. */
 @RunWith(JUnit4.class)
 public class ByteStreamUploaderTest {
 
@@ -102,6 +103,7 @@ public class ByteStreamUploaderTest {
 
   private Server server;
   private ManagedChannel channel;
+  private RemoteActionExecutionContext context;
   private Context withEmptyMetadata;
   private Context prevContext;
 
@@ -112,12 +114,19 @@ public final void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
 
     String serverName = "Server for " + this.getClass();
-    server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
-        .build().start();
+    server =
+        InProcessServerBuilder.forName(serverName)
+            .fallbackHandlerRegistry(serviceRegistry)
+            .build()
+            .start();
     channel = InProcessChannelBuilder.forName(serverName).build();
-    withEmptyMetadata =
-        TracingMetadataUtils.contextWithMetadata(
-            "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(
+            "none",
+            "none",
+            DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+    context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+    withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
 
     retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
 
@@ -161,7 +170,8 @@ public void singleBlobUploadShouldWork() throws Exception {
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
           public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
             return new StreamObserver<WriteRequest>() {
@@ -183,8 +193,8 @@ public void onNext(WriteRequest writeRequest) {
 
                 ByteString data = writeRequest.getData();
 
-                System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset,
-                    data.size());
+                System.arraycopy(
+                    data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
 
                 nextOffset += data.size();
                 boolean lastWrite = blob.length == nextOffset;
@@ -210,7 +220,7 @@ public void onCompleted() {
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verifyZeroInteractions(mockBackoff);
@@ -328,7 +338,7 @@ public void queryWriteStatus(
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
@@ -392,7 +402,7 @@ public void queryWriteStatus(
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     assertThat(numWriteCalls.get()).isEqualTo(1);
@@ -466,7 +476,7 @@ public void queryWriteStatus(
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should have triggered a single retry, because it made
     // no progress.
@@ -508,7 +518,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verifyZeroInteractions(mockBackoff);
@@ -549,7 +559,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
         });
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       // expected
@@ -592,7 +602,7 @@ public void multipleBlobsUploadShouldWork() throws Exception {
 
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    uploader.uploadBlobs(chunkers, true);
+    uploader.uploadBlobs(context, chunkers, true);
 
     blockUntilInternalStateConsistent(uploader);
 
@@ -690,16 +700,19 @@ public void queryWriteStatus(
 
     for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
       Digest actionDigest = chunkerEntry.getKey();
-      Context ctx =
-          TracingMetadataUtils.contextWithMetadata(
-              "build-req-id", "command-id", DIGEST_UTIL.asActionKey(actionDigest));
-      ctx.run(
-          () ->
-              uploads.add(
-                  uploader.uploadBlobAsync(
-                      HashCode.fromString(actionDigest.getHash()),
-                      chunkerEntry.getValue(),
-                      /* forceUpload=*/ true)));
+      RequestMetadata metadata =
+          TracingMetadataUtils.buildMetadata(
+              "build-req-id",
+              "command-id",
+              DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash());
+      RemoteActionExecutionContext remoteActionExecutionContext =
+          new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+      uploads.add(
+          uploader.uploadBlobAsync(
+              remoteActionExecutionContext,
+              actionDigest,
+              chunkerEntry.getValue(),
+              /* forceUpload= */ true));
     }
 
     for (ListenableFuture<Void> upload : uploads) {
@@ -776,7 +789,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
               }
             }));
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
   }
 
   @Test
@@ -796,47 +809,48 @@ public void sameBlobShouldNotBeUploadedTwice() throws Exception {
 
     byte[] blob = new byte[CHUNK_SIZE * 10];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    Digest digest = DIGEST_UTIL.compute(blob);
 
     AtomicInteger numWriteCalls = new AtomicInteger();
     CountDownLatch blocker = new CountDownLatch(1);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        numWriteCalls.incrementAndGet();
-        try {
-          // Ensures that the first upload does not finish, before the second upload is started.
-          blocker.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            numWriteCalls.incrementAndGet();
+            try {
+              // Ensures that the first upload does not finish, before the second upload is started.
+              blocker.await();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
 
-        return new StreamObserver<WriteRequest>() {
+            return new StreamObserver<WriteRequest>() {
 
-          private long bytesReceived;
+              private long bytesReceived;
 
-          @Override
-          public void onNext(WriteRequest writeRequest) {
-            bytesReceived += writeRequest.getData().size();
-          }
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                bytesReceived += writeRequest.getData().size();
+              }
 
-          @Override
-          public void onError(Throwable throwable) {
-            fail("onError should never be called.");
-          }
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
 
-          @Override
-          public void onCompleted() {
-            response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
-            response.onCompleted();
+              @Override
+              public void onCompleted() {
+                response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
+                response.onCompleted();
+              }
+            };
           }
-        };
-      }
-    });
+        });
 
-    Future<?> upload1 = uploader.uploadBlobAsync(hash, chunker, true);
-    Future<?> upload2 = uploader.uploadBlobAsync(hash, chunker, true);
+    Future<?> upload1 = uploader.uploadBlobAsync(context, digest, chunker, true);
+    Future<?> upload2 = uploader.uploadBlobAsync(context, digest, chunker, true);
 
     blocker.countDown();
 
@@ -866,16 +880,17 @@ public void errorsShouldBeReported() throws IOException, InterruptedException {
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        response.onError(Status.INTERNAL.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            response.onError(Status.INTERNAL.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -926,14 +941,14 @@ public void onCancel() {
 
     byte[] blob1 = new byte[CHUNK_SIZE];
     Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash1 = HashCode.fromString(DIGEST_UTIL.compute(blob1).getHash());
+    Digest digest1 = DIGEST_UTIL.compute(blob1);
 
     byte[] blob2 = new byte[CHUNK_SIZE + 1];
     Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build();
-    HashCode hash2 = HashCode.fromString(DIGEST_UTIL.compute(blob2).getHash());
+    Digest digest2 = DIGEST_UTIL.compute(blob2);
 
-    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(hash1, chunker1, true);
-    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(hash2, chunker2, true);
+    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true);
+    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true);
 
     assertThat(uploader.uploadsInProgress()).isTrue();
 
@@ -964,14 +979,15 @@ public void failureInRetryExecutorShouldBeHandled() throws Exception {
             /* callTimeoutSecs= */ 60,
             retrier);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        // Immediately fail the call, so that it is retried.
-        response.onError(Status.ABORTED.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            // Immediately fail the call, so that it is retried.
+            response.onError(Status.ABORTED.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     retryService.shutdownNow();
     // Random very high timeout, as the test will timeout by itself.
@@ -982,7 +998,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1004,35 +1020,34 @@ public void resourceNameWithoutInstanceName() throws Exception {
             /* callTimeoutSecs= */ 60,
             retrier);
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        return new StreamObserver<WriteRequest>() {
-          @Override
-          public void onNext(WriteRequest writeRequest) {
-            // Test that the resource name doesn't start with an instance name.
-            assertThat(writeRequest.getResourceName()).startsWith("uploads/");
-          }
-
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
           @Override
-          public void onError(Throwable throwable) {
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                // Test that the resource name doesn't start with an instance name.
+                assertThat(writeRequest.getResourceName()).startsWith("uploads/");
+              }
 
-          }
+              @Override
+              public void onError(Throwable throwable) {}
 
-          @Override
-          public void onCompleted() {
-            response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
-            response.onCompleted();
+              @Override
+              public void onCompleted() {
+                response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
+                response.onCompleted();
+              }
+            };
           }
-        };
-      }
-    });
+        });
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     withEmptyMetadata.detach(prevContext);
   }
@@ -1053,21 +1068,22 @@ public void nonRetryableStatusShouldNotBeRetried() throws Exception {
 
     AtomicInteger numCalls = new AtomicInteger();
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        numCalls.incrementAndGet();
-        response.onError(Status.INTERNAL.asException());
-        return new NoopStreamObserver();
-      }
-    });
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+            numCalls.incrementAndGet();
+            response.onError(Status.INTERNAL.asException());
+            return new NoopStreamObserver();
+          }
+        });
 
     byte[] blob = new byte[1];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
     try {
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
       fail("Should have thrown an exception.");
     } catch (IOException e) {
       assertThat(numCalls.get()).isEqualTo(1);
@@ -1139,7 +1155,7 @@ public void onCompleted() {
     StatusRuntimeException expected = null;
     try {
       // This should fail
-      uploader.uploadBlob(hash, chunker, true);
+      uploader.uploadBlob(context, hash, chunker, true);
     } catch (IOException e) {
       if (e.getCause() instanceof StatusRuntimeException) {
         expected = (StatusRuntimeException) e.getCause();
@@ -1148,7 +1164,7 @@ public void onCompleted() {
     assertThat(expected).isNotNull();
     assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN);
     // This should trigger an upload.
-    uploader.uploadBlob(hash, chunker, false);
+    uploader.uploadBlob(context, hash, chunker, false);
 
     assertThat(numUploads.get()).isEqualTo(2);
 
@@ -1177,42 +1193,43 @@ public void deduplicationOfUploadsShouldWork() throws Exception {
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
     AtomicInteger numUploads = new AtomicInteger();
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-        numUploads.incrementAndGet();
-        return new StreamObserver<WriteRequest>() {
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            numUploads.incrementAndGet();
+            return new StreamObserver<WriteRequest>() {
 
-          long nextOffset = 0;
+              long nextOffset = 0;
 
-          @Override
-          public void onNext(WriteRequest writeRequest) {
-            nextOffset += writeRequest.getData().size();
-            boolean lastWrite = blob.length == nextOffset;
-            assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-          }
+              @Override
+              public void onNext(WriteRequest writeRequest) {
+                nextOffset += writeRequest.getData().size();
+                boolean lastWrite = blob.length == nextOffset;
+                assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+              }
 
-          @Override
-          public void onError(Throwable throwable) {
-            fail("onError should never be called.");
-          }
+              @Override
+              public void onError(Throwable throwable) {
+                fail("onError should never be called.");
+              }
 
-          @Override
-          public void onCompleted() {
-            assertThat(nextOffset).isEqualTo(blob.length);
+              @Override
+              public void onCompleted() {
+                assertThat(nextOffset).isEqualTo(blob.length);
 
-            WriteResponse response =
-                WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-            streamObserver.onNext(response);
-            streamObserver.onCompleted();
+                WriteResponse response =
+                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+                streamObserver.onNext(response);
+                streamObserver.onCompleted();
+              }
+            };
           }
-        };
-      }
-    });
+        });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
     // This should not trigger an upload.
-    uploader.uploadBlob(hash, chunker, false);
+    uploader.uploadBlob(context, hash, chunker, false);
 
     assertThat(numUploads.get()).isEqualTo(1);
 
@@ -1271,11 +1288,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
           }
         });
 
-    assertThrows(
-        IOException.class,
-        () -> {
-          uploader.uploadBlob(hash, chunker, true);
-        });
+    assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true));
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
@@ -1363,7 +1376,7 @@ public void onCompleted() {
           }
         });
 
-    uploader.uploadBlob(hash, chunker, true);
+    uploader.uploadBlob(context, hash, chunker, true);
 
     assertThat(refreshTimes.get()).isEqualTo(1);
     assertThat(numUploads.get()).isEqualTo(2);
@@ -1378,16 +1391,13 @@ public void onCompleted() {
 
   private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
     @Override
-    public void onNext(WriteRequest writeRequest) {
-    }
+    public void onNext(WriteRequest writeRequest) {}
 
     @Override
-    public void onError(Throwable throwable) {
-    }
+    public void onError(Throwable throwable) {}
 
     @Override
-    public void onCompleted() {
-    }
+    public void onCompleted() {}
   }
 
   static class FixedBackoff implements Retrier.Backoff {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index d22e1ed9a5fa73..c1f010429a152b 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -317,7 +317,7 @@ public void onError(Throwable t) {
         });
 
     // Upload all missing inputs (that is, the virtual action input from above)
-    client.ensureInputsPresent(merkleTree, ImmutableMap.of());
+    client.ensureInputsPresent(remoteActionExecutionContext, merkleTree, ImmutableMap.of());
   }
 
   @Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
index 92e4ca217edcd3..d3bfe63ae769aa 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java
@@ -577,7 +577,7 @@ public void downloadRelativeSymlinkInDirectory() throws Exception {
                 Directory.newBuilder()
                     .addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("../foo")))
             .build();
-    Digest treeDigest = cache.addContents(tree.toByteArray());
+    Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest);
     // Doesn't check for dangling links, hence download succeeds.
@@ -637,7 +637,7 @@ public void downloadAbsoluteSymlinkInDirectoryError() throws Exception {
                 Directory.newBuilder()
                     .addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("/foo")))
             .build();
-    Digest treeDigest = cache.addContents(tree.toByteArray());
+    Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest);
     IOException expected =
@@ -661,10 +661,10 @@ public void downloadAbsoluteSymlinkInDirectoryError() throws Exception {
   public void downloadFailureMaintainsDirectories() throws Exception {
     InMemoryRemoteCache cache = newRemoteCache();
     Tree tree = Tree.newBuilder().setRoot(Directory.newBuilder()).build();
-    Digest treeDigest = cache.addContents(tree.toByteArray());
+    Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray());
     Digest outputFileDigest =
         cache.addException("outputdir/outputfile", new IOException("download failed"));
-    Digest otherFileDigest = cache.addContents("otherfile");
+    Digest otherFileDigest = cache.addContents(remoteActionExecutionContext, "otherfile");
 
     ActionResult.Builder result = ActionResult.newBuilder();
     result.addOutputDirectoriesBuilder().setPath("outputdir").setTreeDigest(treeDigest);
@@ -693,9 +693,9 @@ public void onErrorWaitForRemainingDownloadsToComplete() throws Exception {
     Path stderr = fs.getPath("/execroot/stderr");
 
     InMemoryRemoteCache cache = newRemoteCache();
-    Digest digest1 = cache.addContents("file1");
+    Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
     Digest digest2 = cache.addException("file2", new IOException("download failed"));
-    Digest digest3 = cache.addContents("file3");
+    Digest digest3 = cache.addContents(remoteActionExecutionContext, "file3");
 
     ActionResult result =
         ActionResult.newBuilder()
@@ -729,7 +729,7 @@ public void downloadWithMultipleErrorsAddsThemAsSuppressed() throws Exception {
     Path stderr = fs.getPath("/execroot/stderr");
 
     InMemoryRemoteCache cache = newRemoteCache();
-    Digest digest1 = cache.addContents("file1");
+    Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
     Digest digest2 = cache.addException("file2", new IOException("file2 failed"));
     Digest digest3 = cache.addException("file3", new IOException("file3 failed"));
 
@@ -764,7 +764,7 @@ public void downloadWithDuplicateIOErrorsDoesNotSuppress() throws Exception {
     Path stderr = fs.getPath("/execroot/stderr");
 
     InMemoryRemoteCache cache = newRemoteCache();
-    Digest digest1 = cache.addContents("file1");
+    Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
     IOException reusedException = new IOException("reused io exception");
     Digest digest2 = cache.addException("file2", reusedException);
     Digest digest3 = cache.addException("file3", reusedException);
@@ -800,7 +800,7 @@ public void downloadWithDuplicateInterruptionsDoesNotSuppress() throws Exception
     Path stderr = fs.getPath("/execroot/stderr");
 
     InMemoryRemoteCache cache = newRemoteCache();
-    Digest digest1 = cache.addContents("file1");
+    Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1");
     InterruptedException reusedInterruption = new InterruptedException("reused interruption");
     Digest digest2 = cache.addException("file2", reusedInterruption);
     Digest digest3 = cache.addException("file3", reusedInterruption);
@@ -840,8 +840,8 @@ public void testDownloadWithStdoutStderrOnSuccess() throws Exception {
     when(spyOutErr.childOutErr()).thenReturn(spyChildOutErr);
 
     InMemoryRemoteCache cache = newRemoteCache();
-    Digest digestStdout = cache.addContents("stdout");
-    Digest digestStderr = cache.addContents("stderr");
+    Digest digestStdout = cache.addContents(remoteActionExecutionContext, "stdout");
+    Digest digestStderr = cache.addContents(remoteActionExecutionContext, "stderr");
 
     ActionResult result =
         ActionResult.newBuilder()
@@ -918,8 +918,8 @@ public void testDownloadClashes() throws Exception {
 
     // arrange
     InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest d1 = remoteCache.addContents("content1");
-    Digest d2 = remoteCache.addContents("content2");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+    Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
     ActionResult r =
         ActionResult.newBuilder()
             .setExitCode(0)
@@ -950,8 +950,8 @@ public void testDownloadMinimalFiles() throws Exception {
 
     // arrange
     InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest d1 = remoteCache.addContents("content1");
-    Digest d2 = remoteCache.addContents("content2");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+    Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
     ActionResult r =
         ActionResult.newBuilder()
             .setExitCode(0)
@@ -997,19 +997,19 @@ public void testDownloadMinimalDirectory() throws Exception {
     // Output Directory:
     // dir/file1
     // dir/a/file2
-    Digest d1 = remoteCache.addContents("content1");
-    Digest d2 = remoteCache.addContents("content2");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+    Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
     FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build();
     FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build();
     Directory a = Directory.newBuilder().addFiles(file2).build();
-    Digest da = remoteCache.addContents(a);
+    Digest da = remoteCache.addContents(remoteActionExecutionContext, a);
     Directory root =
         Directory.newBuilder()
             .addFiles(file1)
             .addDirectories(DirectoryNode.newBuilder().setName("a").setDigest(da))
             .build();
     Tree t = Tree.newBuilder().setRoot(root).addChildren(a).build();
-    Digest dt = remoteCache.addContents(t);
+    Digest dt = remoteCache.addContents(remoteActionExecutionContext, t);
     ActionResult r =
         ActionResult.newBuilder()
             .setExitCode(0)
@@ -1070,12 +1070,12 @@ public void testDownloadMinimalDirectoryFails() throws Exception {
     // Output Directory:
     // dir/file1
     // dir/a/file2
-    Digest d1 = remoteCache.addContents("content1");
-    Digest d2 = remoteCache.addContents("content2");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+    Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
     FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build();
     FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build();
     Directory a = Directory.newBuilder().addFiles(file2).build();
-    Digest da = remoteCache.addContents(a);
+    Digest da = remoteCache.addContents(remoteActionExecutionContext, a);
     Directory root =
         Directory.newBuilder()
             .addFiles(file1)
@@ -1127,8 +1127,8 @@ public void testDownloadMinimalWithStdoutStderr() throws Exception {
 
     // arrange
     InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest dOut = remoteCache.addContents("stdout");
-    Digest dErr = remoteCache.addContents("stderr");
+    Digest dOut = remoteCache.addContents(remoteActionExecutionContext, "stdout");
+    Digest dErr = remoteCache.addContents(remoteActionExecutionContext, "stderr");
     ActionResult r =
         ActionResult.newBuilder()
             .setExitCode(0)
@@ -1168,8 +1168,8 @@ public void testDownloadMinimalWithInMemoryOutput() throws Exception {
 
     // arrange
     InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest d1 = remoteCache.addContents("content1");
-    Digest d2 = remoteCache.addContents("content2");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1");
+    Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2");
     ActionResult r =
         ActionResult.newBuilder()
             .setExitCode(0)
@@ -1216,7 +1216,7 @@ public void testDownloadMinimalWithMissingInMemoryOutput() throws Exception {
 
     // arrange
     InMemoryRemoteCache remoteCache = newRemoteCache();
-    Digest d1 = remoteCache.addContents("in-memory output");
+    Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "in-memory output");
     ActionResult r = ActionResult.newBuilder().setExitCode(0).build();
     Artifact a1 = ActionsTestUtil.createArtifact(artifactRoot, "file1");
     MetadataInjector injector = mock(MetadataInjector.class);
@@ -1644,18 +1644,21 @@ private static class InMemoryRemoteCache extends RemoteCache {
       super(new InMemoryCacheClient(), options, digestUtil);
     }
 
-    Digest addContents(String txt) throws IOException, InterruptedException {
-      return addContents(txt.getBytes(UTF_8));
+    Digest addContents(RemoteActionExecutionContext context, String txt)
+        throws IOException, InterruptedException {
+      return addContents(context, txt.getBytes(UTF_8));
     }
 
-    Digest addContents(byte[] bytes) throws IOException, InterruptedException {
+    Digest addContents(RemoteActionExecutionContext context, byte[] bytes)
+        throws IOException, InterruptedException {
       Digest digest = digestUtil.compute(bytes);
-      Utils.getFromFuture(cacheProtocol.uploadBlob(digest, ByteString.copyFrom(bytes)));
+      Utils.getFromFuture(cacheProtocol.uploadBlob(context, digest, ByteString.copyFrom(bytes)));
       return digest;
     }
 
-    Digest addContents(Message m) throws IOException, InterruptedException {
-      return addContents(m.toByteArray());
+    Digest addContents(RemoteActionExecutionContext context, Message m)
+        throws IOException, InterruptedException {
+      return addContents(context, m.toByteArray());
     }
 
     Digest addException(String txt, Exception e) {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
index 7fde5bcd734049..f313ec91cf235d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java
@@ -93,7 +93,7 @@ public void testZeroExitCodeFromCache() throws IOException, InterruptedException
 
     assertThat(executionResult.exitCode()).isEqualTo(0);
   }
-  
+
   @Test
   public void testNoneZeroExitCodeFromCache() throws IOException, InterruptedException {
     // Test that an ActionResult with a none-zero exit code is not accepted as cached.
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 7cb8328e74fb56..fe23a685a7dcb7 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -246,7 +246,7 @@ public void nonCachableSpawnsShouldNotBeCached_localFallback() throws Exception
     runner.exec(spawn, policy);
 
     verify(localRunner).exec(spawn, policy);
-    verify(cache).ensureInputsPresent(any(), any());
+    verify(cache).ensureInputsPresent(any(), any(), any());
     verifyNoMoreInteractions(cache);
   }
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
index 41fdd7f7fa54fd..2e13a3717e9d34 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java
@@ -25,6 +25,7 @@
 import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
 import build.bazel.remote.asset.v1.Qualifier;
 import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteStreams;
@@ -37,6 +38,9 @@
 import com.google.devtools.build.lib.remote.ReferenceCountedChannel;
 import com.google.devtools.build.lib.remote.RemoteRetrier;
 import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
+import com.google.devtools.build.lib.remote.common.NetworkTime;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
 import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
@@ -79,6 +83,7 @@ public class GrpcRemoteDownloaderTest {
   private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
   private final String fakeServerName = "fake server for " + getClass();
   private Server fakeServer;
+  private RemoteActionExecutionContext context;
   private Context withEmptyMetadata;
   private Context prevContext;
   private ListeningScheduledExecutorService retryService;
@@ -92,9 +97,13 @@ public final void setUp() throws Exception {
             .directExecutor()
             .build()
             .start();
-    withEmptyMetadata =
-        TracingMetadataUtils.contextWithMetadata(
-            "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+    RequestMetadata metadata =
+        TracingMetadataUtils.buildMetadata(
+            "none",
+            "none",
+            DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash());
+    context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime());
+    withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata);
 
     retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
 
@@ -189,7 +198,7 @@ public void fetchBlob(
     final RemoteCacheClient cacheClient = new InMemoryCacheClient();
     final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
 
-    getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content)));
+    getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content)));
     final byte[] downloaded =
         downloadBlob(
             downloader, new URL("http://example.com/content.txt"), Optional.<Checksum>empty());
@@ -225,7 +234,7 @@ public void fetchBlob(
     final RemoteCacheClient cacheClient = new InMemoryCacheClient();
     final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
 
-    getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content)));
+    getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content)));
     final byte[] downloaded =
         downloadBlob(
             downloader,
@@ -263,7 +272,8 @@ public void fetchBlob(
     final RemoteCacheClient cacheClient = new InMemoryCacheClient();
     final GrpcRemoteDownloader downloader = newDownloader(cacheClient);
 
-    getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFromUtf8("wrong content")));
+    getFromFuture(
+        cacheClient.uploadBlob(context, contentDigest, ByteString.copyFromUtf8("wrong content")));
 
     IOException e =
         assertThrows(
diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
index 7515f32834fe12..5c091c29f758a8 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java
@@ -319,7 +319,7 @@ public void testUploadAtMostOnce() throws Exception {
 
       ByteString data = ByteString.copyFrom("foo bar", StandardCharsets.UTF_8);
       Digest digest = DIGEST_UTIL.compute(data.toByteArray());
-      blobStore.uploadBlob(digest, data).get();
+      blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get();
 
       assertThat(cacheContents).hasSize(1);
       String cacheKey = "/cas/" + digest.getHash();
@@ -329,7 +329,7 @@ public void testUploadAtMostOnce() throws Exception {
       // Clear the remote cache contents
       cacheContents.clear();
 
-      blobStore.uploadBlob(digest, data).get();
+      blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get();
 
       // Nothing should have been uploaded again.
       assertThat(cacheContents).isEmpty();
@@ -369,7 +369,9 @@ protected void channelRead0(
       Credentials credentials = newCredentials();
       HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
-      getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
+      getFromFuture(
+          blobStore.uploadBlob(
+              remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data)));
       fail("Exception expected");
     } finally {
       testServer.stop(server);
@@ -432,7 +434,10 @@ protected void channelRead0(
               IOException.class,
               () ->
                   getFromFuture(
-                      blobStore.uploadBlob(DIGEST_UTIL.compute(data.toByteArray()), data)));
+                      blobStore.uploadBlob(
+                          remoteActionExecutionContext,
+                          DIGEST_UTIL.compute(data.toByteArray()),
+                          data)));
       assertThat(e.getCause()).isInstanceOf(TooLongFrameException.class);
     } finally {
       testServer.stop(server);
@@ -564,7 +569,10 @@ private void expiredAuthTokensShouldBeRetried_put(
       Credentials credentials = newCredentials();
       HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
-      blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)).get();
+      blobStore
+          .uploadBlob(
+              remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data))
+          .get();
       verify(credentials, times(1)).refresh();
       verify(credentials, times(2)).getRequestMetadata(any(URI.class));
       verify(credentials, times(2)).hasRequestMetadata();
@@ -621,7 +629,10 @@ private void errorCodeThatShouldNotBeRetried_put(
       HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
       byte[] oneByte = new byte[] {0};
       getFromFuture(
-          blobStore.uploadBlob(DIGEST_UTIL.compute(oneByte), ByteString.copyFrom(oneByte)));
+          blobStore.uploadBlob(
+              remoteActionExecutionContext,
+              DIGEST_UTIL.compute(oneByte),
+              ByteString.copyFrom(oneByte)));
       fail("Exception expected.");
     } catch (Exception e) {
       assertThat(e).isInstanceOf(HttpException.class);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
index 9c78ef0ffde548..9894f836eb8a43 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java
@@ -107,7 +107,8 @@ public void uploadActionResult(
   }
 
   @Override
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path file) {
     try (InputStream in = file.getInputStream()) {
       cas.put(digest, ByteStreams.toByteArray(in));
     } catch (IOException e) {
@@ -117,7 +118,8 @@ public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
   }
 
   @Override
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
     try (InputStream in = data.newInput()) {
       cas.put(digest, data.toByteArray());
     } catch (IOException e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
index 8bf39e88be4a2f..2ed32e5dcbc531 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java
@@ -103,6 +103,10 @@ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObser
 
   @Override
   public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
+    RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+    RemoteActionExecutionContext context =
+        new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
     Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
     try {
       FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
@@ -226,7 +230,7 @@ public void onCompleted() {
 
         try {
           Digest d = digestUtil.compute(temp);
-          getFromFuture(cache.uploadFile(d, temp));
+          getFromFuture(cache.uploadFile(context, d, temp));
           try {
             temp.delete();
           } catch (IOException e) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
index 9b3bba571f5461..e862939094cf82 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java
@@ -68,12 +68,16 @@ public void findMissingBlobs(
   @Override
   public void batchUpdateBlobs(
       BatchUpdateBlobsRequest request, StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
+    RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+    RemoteActionExecutionContext context =
+        new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
     BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
     for (BatchUpdateBlobsRequest.Request r : request.getRequestsList()) {
       BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
       try {
         Digest digest = cache.getDigestUtil().compute(r.getData().toByteArray());
-        getFromFuture(cache.uploadBlob(digest, r.getData()));
+        getFromFuture(cache.uploadBlob(context, digest, r.getData()));
         if (!r.getDigest().equals(digest)) {
           String err =
               "Upload digest " + r.getDigest() + " did not match data digest: " + digest;
@@ -94,6 +98,7 @@ public void getTree(GetTreeRequest request, StreamObserver<GetTreeResponse> resp
     RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
     RemoteActionExecutionContext context =
         new RemoteActionExecutionContextImpl(meta, new NetworkTime());
+
     // Directories are returned in depth-first order.  We store all previously-traversed digests so
     // identical subtrees having the same digest will only be traversed and returned once.
     Set<Digest> seenDigests = new HashSet<>();
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
index 804f739089a503..1f39c313dbf60b 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java
@@ -61,12 +61,14 @@ public void downloadTree(
     }
   }
 
-  public ListenableFuture<Void> uploadFile(Digest digest, Path file) {
-    return cacheProtocol.uploadFile(digest, file);
+  public ListenableFuture<Void> uploadFile(
+      RemoteActionExecutionContext context, Digest digest, Path file) {
+    return cacheProtocol.uploadFile(context, digest, file);
   }
 
-  public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
-    return cacheProtocol.uploadBlob(digest, data);
+  public ListenableFuture<Void> uploadBlob(
+      RemoteActionExecutionContext context, Digest digest, ByteString data) {
+    return cacheProtocol.uploadBlob(context, digest, data);
   }
 
   public void uploadActionResult(