Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Batch Auto Pause improvements (#98)
Browse files Browse the repository at this point in the history
* Fixing auto pause leaving an instance without processing before pausing the update.

* Simplifying code. Removing final pause after update has reached a terminal state.
  • Loading branch information
ridv authored Dec 19, 2019
1 parent e02e0a9 commit 3e31bc0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -639,6 +638,17 @@ private boolean isCoordinatedAndPulseExpired(
@VisibleForTesting
static final String PULSE_TIMEOUT_MESSAGE = "Pulses from external service have timed out.";

// Determines whether it is necessary to skip evaluating a side effect if we will be pausing
// the update after this evaluation
private boolean skipSideEffect(
boolean pauseAfterBatch,
SideEffect sideEffect) {
return pauseAfterBatch
&& Collections.disjoint(
sideEffect.getStatusChanges(),
InstanceUpdateStatus.TERMINAL_STATUSES);
}

private void evaluateUpdater(
final MutableStoreProvider storeProvider,
final UpdateFactory.Update update,
Expand Down Expand Up @@ -667,18 +677,24 @@ private void evaluateUpdater(

EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider);

LOG.info(key + " evaluation result: " + result);

final boolean autoPauseAfterBatch =
isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy());
if (autoPauseAfterBatch && maybeAutoPause(summary, result)) {
LOG.info("{} evaluation result: {}", key, result);
final boolean autoPauseAfterCurrentBatch =
isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy())
&& maybeAutoPause(summary, result);
if (autoPauseAfterCurrentBatch) {
changeUpdateStatus(storeProvider,
summary,
newEvent(getPausedState(summary.getState().getStatus())).setMessage(UPDATE_AUTO_PAUSED));
return;
}

for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
// If we're pausing after processing this set of side effects, only process the side effects
// which are in a terminal state in order to avoid starting new shards after the pause
// has kicked in.
if (skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
continue;
}

Iterable<InstanceUpdateStatus> statusChanges;

int instanceId = entry.getKey();
Expand All @@ -689,7 +705,7 @@ private void evaluateUpdater(
.collect(Collectors.toList());

Set<JobUpdateAction> savedActions =
FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet();
savedEvents.stream().map(EVENT_TO_ACTION).collect(Collectors.toSet());

// Don't bother persisting a sequence of status changes that represents an instance that
// was immediately recognized as being healthy and in the desired state.
Expand Down Expand Up @@ -747,34 +763,34 @@ private void evaluateUpdater(
}
}

if (autoPauseAfterBatch) {
instancesSeen.remove(key);
}
changeUpdateStatus(storeProvider, summary, event);
} else {
LOG.info("Executing side-effects for update of " + key + ": " + result.getSideEffects());
for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());

Optional<InstanceAction> action = entry.getValue().getAction();
if (action.isPresent()) {
Optional<InstanceActionHandler> handler = action.get().getHandler();
if (handler.isPresent()) {
Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
instance,
instructions,
storeProvider,
stateManager,
updateAgentReserver,
updaterStatus,
key,
slaKillController);
if (reevaluateDelay.isPresent()) {
executor.schedule(
getDeferredEvaluator(instance, key),
reevaluateDelay.get().getValue(),
reevaluateDelay.get().getUnit().getTimeUnit());
}
return;
}

LOG.info("Executing side-effects for update of {}: {}",
key,
result.getSideEffects().entrySet());
for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) {
IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey());

Optional<InstanceAction> action = entry.getValue().getAction();
if (action.isPresent() && !skipSideEffect(autoPauseAfterCurrentBatch, entry.getValue())) {
Optional<InstanceActionHandler> handler = action.get().getHandler();
if (handler.isPresent()) {
Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay(
instance,
instructions,
storeProvider,
stateManager,
updateAgentReserver,
updaterStatus,
key,
slaKillController);
if (reevaluateDelay.isPresent()) {
executor.schedule(
getDeferredEvaluator(instance, key),
reevaluateDelay.get().getValue(),
reevaluateDelay.get().getUnit().getTimeUnit());
}
}
}
Expand Down Expand Up @@ -869,11 +885,9 @@ private boolean maybeAutoPause(IJobUpdateSummary summary, EvaluationResult<Integ
Set<Integer> instancesCached = instancesSeen.get(key);
Set<Integer> instancesBeingUpdated = result.getSideEffects().keySet();

// On the final batch, pause for acknowledgement and remove the cache of instances.
// This will cause the else branch to get run on resume and the update will finish.
if (result.getStatus() == SUCCEEDED) {
instancesSeen.remove(key);
return true;
return false;
}

// If the update evaluation is dealing with new instances, that signals we are at a barrier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;

import org.apache.aurora.scheduler.updater.StateEvaluator.Failure;

Expand Down Expand Up @@ -139,6 +140,9 @@ public enum InstanceUpdateStatus {
/**
* The instance failed to update and is no longer being monitored.
*/
FAILED
FAILED;

public static final Set<InstanceUpdateStatus> TERMINAL_STATUSES =
ImmutableSet.of(SUCCEEDED, FAILED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Optional<ITaskConfig> getConfig(
}

/**
* Get the lastest {@link JobUpdateStatus} for an update.
* Get the latest {@link JobUpdateStatus} for an update.
*/
static JobUpdateStatus getJobUpdateStatus(IJobUpdateDetails jobUpdateDetails) {
return Iterables.getLast(jobUpdateDetails.getUpdateEvents()).getStatus();
Expand Down
40 changes: 12 additions & 28 deletions src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1842,38 +1842,27 @@ public void testSuccessfulBatchUpdateAutoPause() throws Exception {
updater.start(update, AUDIT);
actions.put(0, INSTANCE_UPDATING).putAll(1, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());
changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));
changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 2, Time.MILLISECONDS));

// Instance 1 finished first, but update does not yet proceed until 0 finishes.
actions.put(1, INSTANCE_UPDATED);
assertState(ROLLING_FORWARD, actions.build());
changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

actions.put(0, INSTANCE_UPDATED).put(1, INSTANCE_UPDATED);

// Update should now be paused
assertState(ROLL_FORWARD_PAUSED, actions.build());

// Continue the update
updater.resume(UPDATE_ID, AUDIT);

actions.put(0, INSTANCE_UPDATED);
actions.put(2, INSTANCE_UPDATING);

assertState(ROLLING_FORWARD, actions.build());

// Instance 2 is updated.
changeState(JOB, 2, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

// Update should now be paused for a second time
assertState(ROLL_FORWARD_PAUSED, actions.build());

// Continue the update
updater.resume(UPDATE_ID, AUDIT);

actions.put(2, INSTANCE_UPDATED);

assertState(ROLLED_FORWARD, actions.build());

assertJobState(
Expand Down Expand Up @@ -1911,11 +1900,12 @@ public void testSuccessfulVarBatchUpdateAutoPause() throws Exception {
changeState(JOB, 0, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

actions.put(0, INSTANCE_UPDATED);

// Update should now be paused after first batch is done.
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);

actions.put(0, INSTANCE_UPDATED);
actions.put(1, INSTANCE_UPDATING).put(2, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());

Expand All @@ -1926,33 +1916,25 @@ public void testSuccessfulVarBatchUpdateAutoPause() throws Exception {
changeState(JOB, 1, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

// Instance 2 will finish before instance 1
actions.put(1, INSTANCE_UPDATED);
actions.put(2, INSTANCE_UPDATED);

// Second autoPause at second barrier
assertState(ROLL_FORWARD_PAUSED, actions.build());

updater.resume(UPDATE_ID, AUDIT);
actions.put(1, INSTANCE_UPDATED);
actions.put(3, INSTANCE_UPDATING).put(4, INSTANCE_UPDATING).put(5, INSTANCE_UPDATING);

assertState(ROLLING_FORWARD, actions.build());

// Third batch is moving forward.
// Make instance 4 the instance that waits for final transition to SUCCEED
changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
changeState(JOB, 3, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(Amount.of(WATCH_TIMEOUT.getValue() / 3, Time.MILLISECONDS));
changeState(JOB, 4, FINISHED, ASSIGNED, STARTING, RUNNING);
changeState(JOB, 5, FINISHED, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

actions.put(3, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);
actions.put(3, INSTANCE_UPDATED).put(4, INSTANCE_UPDATED).put(5, INSTANCE_UPDATED);

// Third barrier
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);

actions.put(4, INSTANCE_UPDATED);
assertState(ROLLED_FORWARD, actions.build());

assertJobState(
Expand Down Expand Up @@ -2001,11 +1983,13 @@ public void testSuccessfulVarBatchUpdateAutoPauseWithRollback() throws Exception
changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
clock.advance(WATCH_TIMEOUT);

actions.put(2, INSTANCE_UPDATED);

// Update should now be paused after first batch is done.
assertState(ROLL_FORWARD_PAUSED, actions.build());
updater.resume(UPDATE_ID, AUDIT);

actions.put(2, INSTANCE_UPDATED).put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
actions.put(0, INSTANCE_UPDATING).put(1, INSTANCE_UPDATING);
assertState(ROLLING_FORWARD, actions.build());

// Move on to batch two
Expand Down

0 comments on commit 3e31bc0

Please sign in to comment.