diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/BUILD b/src/main/java/com/google/devtools/build/lib/dynamic/BUILD index d37f5ac6bd021e..b781e21360fe38 100644 --- a/src/main/java/com/google/devtools/build/lib/dynamic/BUILD +++ b/src/main/java/com/google/devtools/build/lib/dynamic/BUILD @@ -14,11 +14,13 @@ java_library( name = "dynamic", srcs = glob(["*.java"]), deps = [ + "//src/main/java/com/google/devtools/build/lib:build-request-options", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/events", + "//src/main/java/com/google/devtools/build/lib/exec:execution_options", "//src/main/java/com/google/devtools/build/lib/exec:execution_policy", "//src/main/java/com/google/devtools/build/lib/exec:spawn_strategy_registry", "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception", diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java index af5024c28eee73..8d984fee92be8b 100644 --- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java +++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionModule.java @@ -24,6 +24,7 @@ import com.google.devtools.build.lib.actions.Spawn; import com.google.devtools.build.lib.actions.SpawnStrategy; import com.google.devtools.build.lib.actions.Spawns; +import com.google.devtools.build.lib.buildtool.BuildRequestOptions; import com.google.devtools.build.lib.buildtool.BuildResult; import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent; import com.google.devtools.build.lib.concurrent.ExecutorUtil; @@ -133,10 +134,16 @@ private ImmutableMap> getRemoteStrategies(DynamicExecutionO public void registerSpawnStrategies( SpawnStrategyRegistry.Builder registryBuilder, CommandEnvironment env) throws AbruptExitException { + DynamicExecutionOptions options = env.getOptions().getOptions(DynamicExecutionOptions.class); + com.google.devtools.build.lib.exec.ExecutionOptions execOptions = + env.getOptions().getOptions(com.google.devtools.build.lib.exec.ExecutionOptions.class); registerSpawnStrategies( registryBuilder, - env.getOptions().getOptions(DynamicExecutionOptions.class), - env.getReporter()); + options, + env.getReporter(), + options.cpuLimited + ? (int) execOptions.localCpuResources + : env.getOptions().getOptions(BuildRequestOptions.class).jobs); } // CommandEnvironment is difficult to access in tests, so use this method for testing. @@ -144,7 +151,8 @@ public void registerSpawnStrategies( final void registerSpawnStrategies( SpawnStrategyRegistry.Builder registryBuilder, DynamicExecutionOptions options, - Reporter reporter) + Reporter reporter, + int numCpus) throws AbruptExitException { if (!options.internalSpawnScheduler) { return; @@ -156,7 +164,8 @@ final void registerSpawnStrategies( options, this::getExecutionPolicy, this::getPostProcessingSpawnForLocalExecution, - firstBuild); + firstBuild, + numCpus); registryBuilder.registerStrategy(strategy, "dynamic", "dynamic_worker"); registryBuilder.addDynamicLocalStrategies(getLocalStrategies(options)); registryBuilder.addDynamicRemoteStrategies(getRemoteStrategies(options)); diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionOptions.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionOptions.java index 2a11c134a9cb51..7081c035513917 100644 --- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionOptions.java +++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicExecutionOptions.java @@ -53,6 +53,16 @@ public class DynamicExecutionOptions extends OptionsBase { + "enabled.") public boolean internalSpawnScheduler; + @Option( + name = "experimental_dynamic_execution_cpu_limited", + documentationCategory = OptionDocumentationCategory.UNDOCUMENTED, + effectTags = {OptionEffectTag.UNKNOWN}, + defaultValue = "false", + help = + "If true, the number of parallel dynamic executions is limited to the number of CPUs. " + + "The number of CPUs available can be set with the --local_cpu_resources flag.") + public boolean cpuLimited; + @Option( name = "dynamic_local_strategy", converter = Converters.StringToStringListConverter.class, diff --git a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java index af659e084d49d6..e130afcd031e70 100644 --- a/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategy.java @@ -107,6 +107,9 @@ public class DynamicSpawnStrategy implements SpawnStrategy { private boolean skipBuildWarningShown; + /** Limit on how many threads we should use for dynamic execution. */ + private final Semaphore threadLimiter; + /** * Constructs a {@code DynamicSpawnStrategy}. * @@ -118,12 +121,14 @@ public DynamicSpawnStrategy( DynamicExecutionOptions options, Function getExecutionPolicy, Function> getPostProcessingSpawnForLocalExecution, - boolean firstBuild) { + boolean firstBuild, + int numCpus) { this.executorService = MoreExecutors.listeningDecorator(executorService); this.options = options; this.getExecutionPolicy = getExecutionPolicy; this.getExtraSpawnForLocalExecution = getPostProcessingSpawnForLocalExecution; this.firstBuild = firstBuild; + this.threadLimiter = new Semaphore(numCpus); } /** @@ -597,172 +602,196 @@ public ImmutableList exec( } return runRemotely(spawn, actionExecutionContext, null); } - // Extra logging to debug b/194373457 - logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log( - "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe()); - debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint()); - // else both can exec. Fallthrough to below. - - // Semaphores to track termination of each branch. These are necessary to wait for the branch to - // finish its own cleanup (e.g. terminating subprocesses) once it has been cancelled. - Semaphore localDone = new Semaphore(0); - Semaphore remoteDone = new Semaphore(0); - - AtomicReference strategyThatCancelled = new AtomicReference<>(null); - SettableFuture> localBranch = SettableFuture.create(); - SettableFuture> remoteBranch = SettableFuture.create(); - - AtomicBoolean localStarting = new AtomicBoolean(true); - AtomicBoolean remoteStarting = new AtomicBoolean(true); - - localBranch.setFuture( - executorService.submit( - new Branch(LOCAL, actionExecutionContext) { - @Override - ImmutableList callImpl(ActionExecutionContext context) - throws InterruptedException, ExecException { - try { - if (!localStarting.compareAndSet(true, false)) { - // If we ever get here, it's because we were cancelled early and the listener - // ran first. Just make sure that's the case. - checkState(Thread.interrupted()); - throw new InterruptedException(); - } - if (delayLocalExecution.get()) { - Thread.sleep(options.localExecutionDelay); - } - return runLocally( - spawn, - context, - () -> - stopBranch( - remoteBranch, - remoteDone, - localBranch, - LOCAL, - strategyThatCancelled, - DynamicSpawnStrategy.this.options, - actionExecutionContext, - spawn)); - } catch (DynamicInterruptedException e) { - // This exception can be thrown due to races in stopBranch(), in which case - // the branch that lost the race may not have been cancelled yet. Cancel it here - // to prevent the listener from cross-cancelling. - localBranch.cancel(true); - throw e; - } catch ( - @SuppressWarnings("InterruptedExceptionSwallowed") - Throwable e) { - if (options.debugSpawnScheduler) { - logger.atInfo().log( - "Local branch of %s failed with %s: '%s'", - spawn.getResourceOwner().prettyPrint(), - e.getClass().getSimpleName(), - e.getMessage()); - } - throw e; - } finally { - localDone.release(); - } - } - })); - localBranch.addListener( - () -> { - if (localStarting.compareAndSet(true, false)) { - // If the local branch got cancelled before even starting, we release its semaphore for - // it. - localDone.release(); - } - if (!localBranch.isCancelled()) { - remoteBranch.cancel(true); + + // True if we got the threads we need for actual dynamic execution. + boolean gotThreads = false; + try { + if (threadLimiter.tryAcquire()) { + gotThreads = true; + } else { + // If there are no threads available for dynamic execution because we're limited + // to the number of CPUs, we can just execute remotely. + ImmutableList spawnResults = runRemotely(spawn, actionExecutionContext, null); + for (SpawnResult r : spawnResults) { + if (r.isCacheHit()) { + delayLocalExecution.set(true); + break; } - }, - MoreExecutors.directExecutor()); - - remoteBranch.setFuture( - executorService.submit( - new Branch(DynamicMode.REMOTE, actionExecutionContext) { - @Override - public ImmutableList callImpl(ActionExecutionContext context) - throws InterruptedException, ExecException { - try { - if (!remoteStarting.compareAndSet(true, false)) { - // If we ever get here, it's because we were cancelled early and the listener - // ran first. Just make sure that's the case. - checkState(Thread.interrupted()); - throw new InterruptedException(); - } - ImmutableList spawnResults = - runRemotely( - spawn, - context, - () -> - stopBranch( - localBranch, - localDone, - remoteBranch, - DynamicMode.REMOTE, - strategyThatCancelled, - DynamicSpawnStrategy.this.options, - actionExecutionContext, - spawn)); - for (SpawnResult r : spawnResults) { - if (r.isCacheHit()) { - delayLocalExecution.set(true); - break; + } + return spawnResults; + } + + // Extra logging to debug b/194373457 + logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log( + "Spawn %s dynamically executed both ways", spawn.getResourceOwner().describe()); + debugLog("Dynamic execution of %s beginning%n", spawn.getResourceOwner().prettyPrint()); + // else both can exec. Fallthrough to below. + + // Semaphores to track termination of each branch. These are necessary to wait for the branch + // to finish its own cleanup (e.g. terminating subprocesses) once it has been cancelled. + Semaphore localDone = new Semaphore(0); + Semaphore remoteDone = new Semaphore(0); + + AtomicReference strategyThatCancelled = new AtomicReference<>(null); + SettableFuture> localBranch = SettableFuture.create(); + SettableFuture> remoteBranch = SettableFuture.create(); + + AtomicBoolean localStarting = new AtomicBoolean(true); + AtomicBoolean remoteStarting = new AtomicBoolean(true); + + localBranch.setFuture( + executorService.submit( + new Branch(LOCAL, actionExecutionContext) { + @Override + ImmutableList callImpl(ActionExecutionContext context) + throws InterruptedException, ExecException { + try { + if (!localStarting.compareAndSet(true, false)) { + // If we ever get here, it's because we were cancelled early and the listener + // ran first. Just make sure that's the case. + checkState(Thread.interrupted()); + throw new InterruptedException(); + } + if (delayLocalExecution.get()) { + Thread.sleep(options.localExecutionDelay); } + return runLocally( + spawn, + context, + () -> + stopBranch( + remoteBranch, + remoteDone, + localBranch, + LOCAL, + strategyThatCancelled, + DynamicSpawnStrategy.this.options, + actionExecutionContext, + spawn)); + } catch (DynamicInterruptedException e) { + // This exception can be thrown due to races in stopBranch(), in which case + // the branch that lost the race may not have been cancelled yet. Cancel it here + // to prevent the listener from cross-cancelling. + localBranch.cancel(true); + throw e; + } catch ( + @SuppressWarnings("InterruptedExceptionSwallowed") + Throwable e) { + if (options.debugSpawnScheduler) { + logger.atInfo().log( + "Local branch of %s failed with %s: '%s'", + spawn.getResourceOwner().prettyPrint(), + e.getClass().getSimpleName(), + e.getMessage()); + } + throw e; + } finally { + localDone.release(); } - return spawnResults; - } catch (DynamicInterruptedException e) { - // This exception can be thrown due to races in stopBranch(), in which case - // the branch that lost the race may not have been cancelled yet. Cancel it here - // to prevent the listener from cross-cancelling. - remoteBranch.cancel(true); - throw e; - } catch ( - @SuppressWarnings("InterruptedExceptionSwallowed") - Throwable e) { - if (options.debugSpawnScheduler) { - logger.atInfo().log( - "Remote branch of %s failed with %s: '%s'", - spawn.getResourceOwner().prettyPrint(), - e.getClass().getSimpleName(), - e.getMessage()); + } + })); + localBranch.addListener( + () -> { + if (localStarting.compareAndSet(true, false)) { + // If the local branch got cancelled before even starting, we release its semaphore + // for it. + localDone.release(); + } + if (!localBranch.isCancelled()) { + remoteBranch.cancel(true); + } + }, + MoreExecutors.directExecutor()); + + remoteBranch.setFuture( + executorService.submit( + new Branch(DynamicMode.REMOTE, actionExecutionContext) { + @Override + public ImmutableList callImpl(ActionExecutionContext context) + throws InterruptedException, ExecException { + try { + if (!remoteStarting.compareAndSet(true, false)) { + // If we ever get here, it's because we were cancelled early and the listener + // ran first. Just make sure that's the case. + checkState(Thread.interrupted()); + throw new InterruptedException(); + } + ImmutableList spawnResults = + runRemotely( + spawn, + context, + () -> + stopBranch( + localBranch, + localDone, + remoteBranch, + DynamicMode.REMOTE, + strategyThatCancelled, + DynamicSpawnStrategy.this.options, + actionExecutionContext, + spawn)); + for (SpawnResult r : spawnResults) { + if (r.isCacheHit()) { + delayLocalExecution.set(true); + break; + } + } + return spawnResults; + } catch (DynamicInterruptedException e) { + // This exception can be thrown due to races in stopBranch(), in which case + // the branch that lost the race may not have been cancelled yet. Cancel it here + // to prevent the listener from cross-cancelling. + remoteBranch.cancel(true); + throw e; + } catch ( + @SuppressWarnings("InterruptedExceptionSwallowed") + Throwable e) { + if (options.debugSpawnScheduler) { + logger.atInfo().log( + "Remote branch of %s failed with %s: '%s'", + spawn.getResourceOwner().prettyPrint(), + e.getClass().getSimpleName(), + e.getMessage()); + } + throw e; + } finally { + remoteDone.release(); } - throw e; - } finally { - remoteDone.release(); } - } - })); - remoteBranch.addListener( - () -> { - if (remoteStarting.compareAndSet(true, false)) { - // If the remote branch got cancelled before even starting, we release its semaphore for - // it. - remoteDone.release(); - } - if (!remoteBranch.isCancelled()) { - localBranch.cancel(true); - } - }, - MoreExecutors.directExecutor()); + })); + remoteBranch.addListener( + () -> { + if (remoteStarting.compareAndSet(true, false)) { + // If the remote branch got cancelled before even starting, we release its semaphore + // for it. + remoteDone.release(); + } + if (!remoteBranch.isCancelled()) { + localBranch.cancel(true); + } + }, + MoreExecutors.directExecutor()); - try { - return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext); + try { + return waitBranches(localBranch, remoteBranch, spawn, options, actionExecutionContext); + } finally { + checkState(localBranch.isDone()); + checkState(remoteBranch.isDone()); + logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log( + "Dynamic execution of %s ended with local %s, remote %s%n", + spawn.getResourceOwner().prettyPrint(), + localBranch.isCancelled() ? "cancelled" : "done", + remoteBranch.isCancelled() ? "cancelled" : "done"); + debugLog( + "Dynamic execution of %s ended with local %s, remote %s%n", + spawn.getResourceOwner().prettyPrint(), + localBranch.isCancelled() ? "cancelled" : "done", + remoteBranch.isCancelled() ? "cancelled" : "done"); + } } finally { - checkState(localBranch.isDone()); - checkState(remoteBranch.isDone()); - logger.atInfo().atMostEvery(1, TimeUnit.SECONDS).log( - "Dynamic execution of %s ended with local %s, remote %s%n", - spawn.getResourceOwner().prettyPrint(), - localBranch.isCancelled() ? "cancelled" : "done", - remoteBranch.isCancelled() ? "cancelled" : "done"); - debugLog( - "Dynamic execution of %s ended with local %s, remote %s%n", - spawn.getResourceOwner().prettyPrint(), - localBranch.isCancelled() ? "cancelled" : "done", - remoteBranch.isCancelled() ? "cancelled" : "done"); + if (gotThreads) { + threadLimiter.release(); + } } } diff --git a/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyTest.java b/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyTest.java index b6ae3d7b513072..468918a8232566 100644 --- a/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyTest.java +++ b/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyTest.java @@ -331,7 +331,7 @@ private StrategyAndContext createSpawnStrategyWithExecutor( DynamicExecutionModule dynamicExecutionModule = new DynamicExecutionModule(executorService); dynamicExecutionModule.registerSpawnStrategies( - spawnStrategyRegistryBuilder, options, new Reporter(new EventBus())); + spawnStrategyRegistryBuilder, options, new Reporter(new EventBus()), 10); SpawnStrategyRegistry spawnStrategyRegistry = spawnStrategyRegistryBuilder.build(); diff --git a/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyUnitTest.java b/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyUnitTest.java index edf48ec4909843..44f2ab7396cc3c 100644 --- a/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyUnitTest.java +++ b/src/test/java/com/google/devtools/build/lib/dynamic/DynamicSpawnStrategyUnitTest.java @@ -442,7 +442,8 @@ private DynamicSpawnStrategy createDynamicSpawnStrategy( options, ignored -> executionPolicy, getPostProcessingSpawnForLocalExecution, - isFirst); + isFirst, + 10); } private static ActionExecutionContext createMockActionExecutionContext(