Skip to content

Commit

Permalink
[7.1.0] Introduce a MessageInputStream abstraction, mirroring Message…
Browse files Browse the repository at this point in the history
…OutputStream. (#21207)

Use it to improve the StableSort API.

The JsonInputStreamWrapper will be used by a execution log conversion
tool to be introduced in a followup.

PiperOrigin-RevId: 603677229
Change-Id: I1de744c187a3c84d36b9f6c2755cbf5f4ace5100
  • Loading branch information
tjgq authored Feb 6, 2024
1 parent 0cb96ee commit 26c05cb
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
.exit(
new AbruptExitException(
createDetailedExitCode(
"Error initializing execution log",
String.format("Error initializing execution log: %s", e.getMessage()),
Code.EXECUTION_LOG_INITIALIZATION_FAILURE)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.util.io.AsynchronousMessageOutputStream;
import com.google.devtools.build.lib.util.io.MessageInputStream;
import com.google.devtools.build.lib.util.io.MessageInputStreamWrapper.BinaryInputStreamWrapper;
import com.google.devtools.build.lib.util.io.MessageOutputStream;
import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.BinaryOutputStreamWrapper;
import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.JsonOutputStreamWrapper;
Expand All @@ -49,7 +51,6 @@
import com.google.devtools.build.lib.vfs.Symlinks;
import com.google.devtools.build.lib.vfs.XattrProvider;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -73,9 +74,12 @@ public enum Encoding {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final Path tempPath;
private final Encoding encoding;
private final boolean sorted;

private final Path tempPath;
private final Path outputPath;

private final PathFragment execRoot;
@Nullable private final RemoteOptions remoteOptions;
private final DigestHashFunction digestHashFunction;
Expand All @@ -84,9 +88,6 @@ public enum Encoding {
/** Output stream to write directly into during execution. */
private final MessageOutputStream<SpawnExec> rawOutputStream;

/** Output stream to convert the raw output stream into after execution, if required. */
@Nullable private final MessageOutputStream<SpawnExec> convertedOutputStream;

public ExpandedSpawnLogContext(
Path outputPath,
Path tempPath,
Expand All @@ -97,33 +98,38 @@ public ExpandedSpawnLogContext(
DigestHashFunction digestHashFunction,
XattrProvider xattrProvider)
throws IOException {
this.tempPath = tempPath;
this.encoding = encoding;
this.sorted = sorted;
this.tempPath = tempPath;
this.outputPath = outputPath;
this.execRoot = execRoot;
this.remoteOptions = remoteOptions;
this.digestHashFunction = digestHashFunction;
this.xattrProvider = xattrProvider;

if (encoding == Encoding.BINARY && !sorted) {
if (needsConversion()) {
// Write the unsorted binary format into a temporary path first, then convert into the output
// format after execution. Delete a preexisting output file so that an incomplete invocation
// doesn't appear to produce a nonsensical log.
outputPath.delete();
rawOutputStream = getRawOutputStream(tempPath);
} else {
// The unsorted binary format can be written directly into the output path during execution.
rawOutputStream = getRawOutputStream(outputPath);
convertedOutputStream = null;
} else {
// Otherwise, write the unsorted binary format into a temporary path first, then convert into
// the output format after execution.
rawOutputStream = getRawOutputStream(tempPath);
convertedOutputStream = getConvertedOutputStream(encoding, outputPath);
}
}

private boolean needsConversion() {
return encoding != Encoding.BINARY || sorted;
}

private static MessageOutputStream<SpawnExec> getRawOutputStream(Path path) throws IOException {
// Use an AsynchronousMessageOutputStream so that writes occur in a separate thread.
// This ensures concurrent writes don't tear and avoids blocking execution.
return new AsynchronousMessageOutputStream<>(path);
}

private static MessageOutputStream<SpawnExec> getConvertedOutputStream(
Encoding encoding, Path path) throws IOException {
private MessageOutputStream<SpawnExec> getConvertedOutputStream(Path path) throws IOException {
switch (encoding) {
case BINARY:
return new BinaryOutputStreamWrapper<>(path.getOutputStream());
Expand Down Expand Up @@ -283,21 +289,23 @@ public void logSpawn(
public void close() throws IOException {
rawOutputStream.close();

if (convertedOutputStream == null) {
// No conversion required.
if (!needsConversion()) {
return;
}

try (InputStream in = tempPath.getInputStream()) {
try (MessageInputStream<SpawnExec> rawInputStream =
new BinaryInputStreamWrapper<>(
tempPath.getInputStream(), SpawnExec.getDefaultInstance());
MessageOutputStream<SpawnExec> convertedOutputStream =
getConvertedOutputStream(outputPath)) {
if (sorted) {
StableSort.stableSort(in, convertedOutputStream);
StableSort.stableSort(rawInputStream, convertedOutputStream);
} else {
SpawnExec ex;
while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) {
while ((ex = rawInputStream.read()) != null) {
convertedOutputStream.write(ex);
}
}
convertedOutputStream.close();
} finally {
try {
tempPath.delete();
Expand Down
148 changes: 69 additions & 79 deletions src/main/java/com/google/devtools/build/lib/exec/StableSort.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//
package com.google.devtools.build.lib.exec;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
Expand All @@ -23,12 +22,12 @@
import com.google.devtools.build.lib.exec.Protos.SpawnExec;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.util.io.MessageInputStream;
import com.google.devtools.build.lib.util.io.MessageOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;

Expand All @@ -39,18 +38,9 @@
* <p>This is needed to allow textual diff comparisons of resultant logs.
*/
public final class StableSort {
private static ImmutableList<SpawnExec> read(InputStream in) throws IOException {
ImmutableList.Builder<SpawnExec> result = ImmutableList.builder();
SpawnExec ex;
while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) {
result.add(ex);
}
return result.build();
}

/**
* Reads length-delimited wire format {@link SpawnExec} protos from an {@link InputStream}, sorts
* them, and writes them to a {@link MessageOutputStream}.
* Reads {@link SpawnExec} protos from an {@link MessageInputStream}, sorts them, and writes them
* to a {@link MessageOutputStream}.
*
* <p>The sort order has the following properties:
*
Expand All @@ -62,81 +52,81 @@ private static ImmutableList<SpawnExec> read(InputStream in) throws IOException
*
* <p>Assumes that there are no cyclic dependencies.
*/
public static void stableSort(InputStream in, MessageOutputStream<SpawnExec> out)
throws IOException {
public static void stableSort(
MessageInputStream<SpawnExec> in, MessageOutputStream<SpawnExec> out) throws IOException {
try (SilentCloseable c = Profiler.instance().profile("stableSort")) {
ImmutableList<SpawnExec> inputs;
ArrayList<SpawnExec> inputs = new ArrayList<>();

try (SilentCloseable c2 = Profiler.instance().profile("stableSort/read")) {
inputs = read(in);
SpawnExec ex;
while ((ex = in.read()) != null) {
inputs.add(ex);
}
}
stableSort(inputs, out);
}
}

public static void stableSort(List<SpawnExec> inputs, MessageOutputStream<SpawnExec> out)
throws IOException {
// A map from each output to every spawn that produces it.
// The same output may be produced by multiple spawns in the case of multiple test attempts.
Multimap<String, SpawnExec> outputProducer =
MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build();

for (SpawnExec ex : inputs) {
for (File output : ex.getActualOutputsList()) {
String name = output.getPath();
outputProducer.put(name, ex);
}
}
// A map from each output to every spawn that produces it.
// The same output may be produced by multiple spawns in the case of multiple test attempts.
Multimap<String, SpawnExec> outputProducer =
MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build();

// A blocks B if A produces an output consumed by B.
// Use reference equality to avoid expensive comparisons.
IdentitySetMultimap<SpawnExec, SpawnExec> blockedBy = new IdentitySetMultimap<>();
IdentitySetMultimap<SpawnExec, SpawnExec> blocking = new IdentitySetMultimap<>();

// The queue contains all spawns whose blockers have already been emitted.
PriorityQueue<SpawnExec> queue =
new PriorityQueue<>(
Comparator.comparing(
o -> {
// Sort by comparing the path of the first output. We don't want the sorting to
// rely on file hashes because we want the same action graph to be sorted in the
// same way regardless of file contents.
if (o.getListedOutputsCount() > 0) {
return "1_" + o.getListedOutputs(0);
}

// Get a proto with only stable information from this proto
SpawnExec.Builder stripped = SpawnExec.newBuilder();
stripped.addAllCommandArgs(o.getCommandArgsList());
stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList());
stripped.setPlatform(o.getPlatform());
stripped.addAllInputs(o.getInputsList());
stripped.setMnemonic(o.getMnemonic());

return "2_" + stripped.build();
}));

for (SpawnExec ex : inputs) {
boolean blocked = false;
for (File s : ex.getInputsList()) {
for (SpawnExec blocker : outputProducer.get(s.getPath())) {
blockedBy.put(ex, blocker);
blocking.put(blocker, ex);
blocked = true;
for (SpawnExec ex : inputs) {
for (File output : ex.getActualOutputsList()) {
String name = output.getPath();
outputProducer.put(name, ex);
}
}
if (!blocked) {
queue.add(ex);

// A blocks B if A produces an output consumed by B.
// Use reference equality to avoid expensive comparisons.
IdentitySetMultimap<SpawnExec, SpawnExec> blockedBy = new IdentitySetMultimap<>();
IdentitySetMultimap<SpawnExec, SpawnExec> blocking = new IdentitySetMultimap<>();

// The queue contains all spawns whose blockers have already been emitted.
PriorityQueue<SpawnExec> queue =
new PriorityQueue<>(
Comparator.comparing(
o -> {
// Sort by comparing the path of the first output. We don't want the sorting to
// rely on file hashes because we want the same action graph to be sorted in the
// same way regardless of file contents.
if (o.getListedOutputsCount() > 0) {
return "1_" + o.getListedOutputs(0);
}

// Get a proto with only stable information from this proto
SpawnExec.Builder stripped = SpawnExec.newBuilder();
stripped.addAllCommandArgs(o.getCommandArgsList());
stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList());
stripped.setPlatform(o.getPlatform());
stripped.addAllInputs(o.getInputsList());
stripped.setMnemonic(o.getMnemonic());

return "2_" + stripped.build();
}));

for (SpawnExec ex : inputs) {
boolean blocked = false;
for (File s : ex.getInputsList()) {
for (SpawnExec blocker : outputProducer.get(s.getPath())) {
blockedBy.put(ex, blocker);
blocking.put(blocker, ex);
blocked = true;
}
}
if (!blocked) {
queue.add(ex);
}
}
}

while (!queue.isEmpty()) {
SpawnExec curr = queue.remove();
out.write(curr);
while (!queue.isEmpty()) {
SpawnExec curr = queue.remove();
out.write(curr);

for (SpawnExec blocked : blocking.get(curr)) {
blockedBy.remove(blocked, curr);
if (!blockedBy.containsKey(blocked)) {
queue.add(blocked);
for (SpawnExec blocked : blocking.get(curr)) {
blockedBy.remove(blocked, curr);
if (!blockedBy.containsKey(blocked)) {
queue.add(blocked);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;

import com.google.protobuf.Message;
import java.io.IOException;
import javax.annotation.Nullable;

/** A variation of InputStream for protobuf messages. */
public interface MessageInputStream<T extends Message> extends AutoCloseable {
/** Reads a protobuf message from the underlying stream, or null if there are no more messages. */
@Nullable
T read() throws IOException;

/** Closes the underlying stream. Any following reads will fail. */
@Override
void close() throws IOException;
}
Loading

0 comments on commit 26c05cb

Please sign in to comment.