Skip to content

Commit

Permalink
Handle CPU limiting of local branches with a queue rather than droppi…
Browse files Browse the repository at this point in the history
…ng the excess on the floor.

This will later allow us to be smarter about which actions to schedule for local execution.

PiperOrigin-RevId: 421317543
  • Loading branch information
larsrc-google authored and copybara-github committed Jan 12, 2022
1 parent 3768323 commit 6f89288
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
Expand Down Expand Up @@ -124,6 +125,11 @@ String branchState() {
+ (isDone() ? "done" : "not done");
}

/** Executes this branch using the provided executor. */
public void execute(ListeningExecutorService executor) {
future.setFuture(executor.submit(this));
}

/**
* Moves a set of stdout/stderr files over another one. Errors during the move are logged and
* swallowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionContext;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
Expand All @@ -45,6 +44,8 @@
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class DynamicSpawnStrategy implements SpawnStrategy {
/** Limit on how many threads we should use for dynamic execution. */
private final Semaphore threadLimiter;

/** Set of jobs that are waiting for local execution. */
private final Deque<LocalBranch> waitingLocalJobs = new ArrayDeque<>();

/**
* Constructs a {@code DynamicSpawnStrategy}.
*
Expand Down Expand Up @@ -203,69 +207,76 @@ public ImmutableList<SpawnResult> exec(
return nonDynamicResults;
}

// True if we got the threads we need for actual dynamic execution.
boolean gotThreads = false;
// Extra logging to debug b/194373457
logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
"Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
// else both can exec. Fallthrough to below.

AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);

LocalBranch localBranch =
new LocalBranch(
actionExecutionContext,
spawn,
strategyThatCancelled,
options,
ignoreFailureCheck,
getExtraSpawnForLocalExecution,
delayLocalExecution);
RemoteBranch remoteBranch =
new RemoteBranch(
actionExecutionContext,
spawn,
strategyThatCancelled,
options,
ignoreFailureCheck,
delayLocalExecution);

localBranch.prepareFuture(remoteBranch);
remoteBranch.prepareFuture(localBranch);
synchronized (waitingLocalJobs) {
waitingLocalJobs.add(localBranch);
}
tryScheduleLocalJob();
remoteBranch.execute(executorService);

try {
if (threadLimiter.tryAcquire()) {
gotThreads = true;
} else {
// If there are no threads available for dynamic execution because we're limited
// to the number of CPUs, we can just execute remotely.
return RemoteBranch.runRemotely(spawn, actionExecutionContext, null, delayLocalExecution);
return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
} finally {
checkState(localBranch.isDone());
checkState(remoteBranch.isDone());
if (!waitingLocalJobs.contains(localBranch)) {
synchronized (waitingLocalJobs) {
if (!waitingLocalJobs.contains(localBranch)) {
threadLimiter.release();
tryScheduleLocalJob();
}
}
}

// Extra logging to debug b/194373457
logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
"Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe());
debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint());
// else both can exec. Fallthrough to below.

AtomicReference<DynamicMode> strategyThatCancelled = new AtomicReference<>(null);

LocalBranch localBranch =
new LocalBranch(
actionExecutionContext,
spawn,
strategyThatCancelled,
options,
ignoreFailureCheck,
getExtraSpawnForLocalExecution,
delayLocalExecution);
RemoteBranch remoteBranch =
new RemoteBranch(
actionExecutionContext,
spawn,
strategyThatCancelled,
options,
ignoreFailureCheck,
delayLocalExecution);

SettableFuture<ImmutableList<SpawnResult>> localFuture =
localBranch.prepareFuture(remoteBranch);
SettableFuture<ImmutableList<SpawnResult>> remoteFuture =
remoteBranch.prepareFuture(localBranch);
localFuture.setFuture(executorService.submit(localBranch));
remoteFuture.setFuture(executorService.submit(remoteBranch));

try {
return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext);
} finally {
checkState(localBranch.isDone());
checkState(remoteBranch.isDone());
logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log(
"Dynamic execution of %s ended with local %s, remote %s%n",
spawn.getResourceOwner().prettyPrint(),
localBranch.isCancelled() ? "cancelled" : "done",
remoteBranch.isCancelled() ? "cancelled" : "done");
debugLog(
"Dynamic execution of %s ended with local %s, remote %s%n",
spawn.getResourceOwner().prettyPrint(),
localBranch.isCancelled() ? "cancelled" : "done",
remoteBranch.isCancelled() ? "cancelled" : "done");
}
} finally {
if (gotThreads) {
threadLimiter.release();
"Dynamic execution of %s ended with local %s, remote %s%n",
spawn.getResourceOwner().prettyPrint(),
localBranch.isCancelled() ? "cancelled" : "done",
remoteBranch.isCancelled() ? "cancelled" : "done");
debugLog(
"Dynamic execution of %s ended with local %s, remote %s%n",
spawn.getResourceOwner().prettyPrint(),
localBranch.isCancelled() ? "cancelled" : "done",
remoteBranch.isCancelled() ? "cancelled" : "done");
}
}

/**
* Tries to schedule as many local jobs as are permitted by {@link #threadLimiter}. "Scheduling"
* here means putting it on a thread and making it start the normal strategy execution, but it
* will still have to wait for resources, so it may not execute for a while.
*/
private void tryScheduleLocalJob() {
synchronized (waitingLocalJobs) {
while (!waitingLocalJobs.isEmpty() && threadLimiter.tryAcquire()) {
LocalBranch job = waitingLocalJobs.pollLast();
job.execute(executorService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
Expand Down Expand Up @@ -132,7 +131,7 @@ private static ImmutableList<SpawnResult> runSpawnLocally(
}

/** Sets up the {@link Future} used in the local branch to know what remote branch to cancel. */
protected SettableFuture<ImmutableList<SpawnResult>> prepareFuture(RemoteBranch remoteBranch) {
protected void prepareFuture(RemoteBranch remoteBranch) {
// TODO(b/203094728): Maybe generify this method and move it up.
this.remoteBranch = remoteBranch;
future.addListener(
Expand All @@ -147,7 +146,6 @@ protected SettableFuture<ImmutableList<SpawnResult>> prepareFuture(RemoteBranch
}
},
MoreExecutors.directExecutor());
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry;
import com.google.devtools.build.lib.actions.DynamicStrategyRegistry.DynamicMode;
Expand Down Expand Up @@ -111,7 +110,7 @@ static ImmutableList<SpawnResult> runRemotely(
}

/** Sets up the future for this branch, once the other branch is available. */
public SettableFuture<ImmutableList<SpawnResult>> prepareFuture(LocalBranch localBranch) {
public void prepareFuture(LocalBranch localBranch) {
this.localBranch = localBranch;
future.addListener(
() -> {
Expand All @@ -125,7 +124,6 @@ public SettableFuture<ImmutableList<SpawnResult>> prepareFuture(LocalBranch loca
}
},
MoreExecutors.directExecutor());
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.Artifact.SourceArtifact;
import com.google.devtools.build.lib.actions.ArtifactRoot;
Expand Down Expand Up @@ -413,7 +412,7 @@ public void exec_runAnywhereSpawn_skipFirst_onlyRemote() throws Exception {
}

@Test
public void waitBranches_givesDebugOutputOnWeirdCases() throws Exception {
public void waitBranches_givesDebugOutputIfBothCancelled() throws Exception {
Spawn spawn =
new SpawnBuilder()
.withOwnerPrimaryOutput(new SourceArtifact(rootDir, PathFragment.create("/foo"), null))
Expand All @@ -428,12 +427,10 @@ public void waitBranches_givesDebugOutputOnWeirdCases() throws Exception {
actionExecutionContext, spawn, strategyThatCancelled, options, null, null, null);
RemoteBranch remoteBranch =
new RemoteBranch(actionExecutionContext, spawn, strategyThatCancelled, options, null, null);
SettableFuture<ImmutableList<SpawnResult>> localFuture =
localBranch.prepareFuture(remoteBranch);
SettableFuture<ImmutableList<SpawnResult>> remoteFuture =
remoteBranch.prepareFuture(localBranch);
localFuture.set(null);
remoteFuture.set(null);
localBranch.prepareFuture(remoteBranch);
remoteBranch.prepareFuture(localBranch);
localBranch.cancel();
remoteBranch.cancel();
AssertionError error =
assertThrows(
AssertionError.class,
Expand Down

0 comments on commit 6f89288

Please sign in to comment.