Skip to content

Commit

Permalink
Ck/cherrypick 15669 (bazelbuild#15788)
Browse files Browse the repository at this point in the history
* Chunker: Always seek on the uncompressed stream.

The `WriteRequest.write_offset` field has bizarre semantics during compressed uploads as documented in the remote API protos: https://github.com/bazelbuild/remote-apis/blob/3b4b6402103539d66fcdd1a4d945f660742665ca/build/bazel/remote/execution/v2/remote_execution.proto#L241-L248 In particular, the write offset of the first `WriteRequest` refers to the offset in the uncompressed source.

This change ensures we always seek the uncompressed stream to the correct offset when starting an upload call. The old code could fail to resume compressed uploads under some conditions. The `progressiveCompressedUploadShouldWork` test purported to exercise this situation. The test, however, contained the same logic error as the code under test.

Closes bazelbuild#15669.

PiperOrigin-RevId: 455083727
Change-Id: Ie22dacf31f15644c7a83f49776e7a633d8bb4bca

* Updated chunker.java file.

* Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java

Co-authored-by: Benjamin Peterson <[email protected]>

* Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java

Co-authored-by: Benjamin Peterson <[email protected]>

* Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java

Co-authored-by: Benjamin Peterson <[email protected]>

Co-authored-by: Benjamin Peterson <[email protected]>
Co-authored-by: Benjamin Peterson <[email protected]>
  • Loading branch information
3 people authored Jul 7, 2022
1 parent 3ea9eb2 commit 64571a4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 35 deletions.
30 changes: 22 additions & 8 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,23 @@ public void reset() throws IOException {
}

/**
* Seek to an offset, if necessary resetting or initializing
* Seek to an offset in the source stream.
*
* <p>May close open resources in order to seek to an earlier offset.
* <p>May close and reopen resources in order to seek to an earlier offset.
*
* @param toOffset the offset from beginning of the source stream. If the source stream is
* compressed, it refers to the offset in the uncompressed form to align with `write_offset`
* in REAPI.
*/
public void seek(long toOffset) throws IOException {
if (toOffset < offset) {
// For compressed stream, we need to reinitialize the stream since the offset refers to the
// uncompressed form.
if (initialized && toOffset >= offset && !compressed) {
ByteStreams.skipFully(data, toOffset - offset);
} else {
reset();
initialize(toOffset);
}
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
offset = toOffset;
if (data.finished()) {
close();
}
Expand Down Expand Up @@ -245,18 +251,26 @@ private void maybeInitialize() throws IOException {
if (initialized) {
return;
}
initialize(0);
}

private void initialize(long srcPos) throws IOException {
checkState(!initialized);
checkState(data == null);
checkState(offset == 0);
checkState(chunkCache == null);
try {
InputStream src = dataSupplier.get();
ByteStreams.skipFully(src, srcPos);
data =
compressed
? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get()))
: new ChunkerInputStream(dataSupplier.get());
? new ChunkerInputStream(new ZstdCompressingInputStream(src))
: new ChunkerInputStream(src);
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
}
offset = srcPos;
initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -357,23 +358,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception {
300,
retrier);

byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
int chunkSize = 1024;
int skipSize = chunkSize + 1;
byte[] blob = new byte[chunkSize * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());

while (chunker.hasNext()) {
chunker.next();
}
long expectedSize = chunker.getOffset();
chunker.reset();

ByteArrayOutputStream output = new ByteArrayOutputStream();
serviceRegistry.addService(
new ByteStreamImplBase() {

byte[] receivedData = new byte[(int) expectedSize];
String receivedResourceName = null;
boolean receivedComplete = false;
long nextOffset = 0;
Expand All @@ -398,21 +394,21 @@ public void onNext(WriteRequest writeRequest) {
assertThat(resourceName).isEmpty();
}

assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);

ByteString data = writeRequest.getData();

System.arraycopy(
data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());

nextOffset += data.size();
receivedComplete = expectedSize == nextOffset;
assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete);

if (initialOffset == 0) {
streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
mustQueryWriteStatus = true;
initialOffset = nextOffset;
initialOffset = skipSize;
nextOffset = initialOffset;
} else {
ByteString data = writeRequest.getData();
try {
data.writeTo(output);
} catch (IOException e) {
streamObserver.onError(e);
return;
}
nextOffset += data.size();
receivedComplete = writeRequest.getFinishWrite();
}
}

Expand All @@ -423,10 +419,6 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
assertThat(nextOffset).isEqualTo(expectedSize);
byte[] decompressed = Zstd.decompress(receivedData, blob.length);
assertThat(decompressed).isEqualTo(blob);

WriteResponse response =
WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
streamObserver.onNext(response);
Expand All @@ -444,7 +436,7 @@ public void queryWriteStatus(
if (receivedResourceName != null && receivedResourceName.equals(resourceName)) {
assertThat(mustQueryWriteStatus).isTrue();
mustQueryWriteStatus = false;
committedSize = nextOffset;
committedSize = receivedComplete ? blob.length : skipSize;
complete = receivedComplete;
} else {
committedSize = 0;
Expand All @@ -460,6 +452,9 @@ public void queryWriteStatus(
});

uploader.uploadBlob(context, hash, chunker, true);
byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize);
assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length))
.isTrue();

// This test should not have triggered any retries.
Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));
Expand Down

0 comments on commit 64571a4

Please sign in to comment.