Skip to content

Commit

Permalink
Make ExecutionSchedule return StageExecutionAndScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
rschlussel committed Feb 7, 2020
1 parent 3185bf4 commit b84f177
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.SqlStageExecution;

import java.util.Collection;

public class AllAtOnceExecutionPolicy
implements ExecutionPolicy
{
@Override
public ExecutionSchedule createExecutionSchedule(Collection<SqlStageExecution> stages)
public ExecutionSchedule createExecutionSchedule(Collection<StageExecutionAndScheduler> stages)
{
return new AllAtOnceExecutionSchedule(stages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,27 @@
public class AllAtOnceExecutionSchedule
implements ExecutionSchedule
{
private final Set<SqlStageExecution> schedulingStages;
private final Set<StageExecutionAndScheduler> schedulingStages;

public AllAtOnceExecutionSchedule(Collection<SqlStageExecution> stages)
public AllAtOnceExecutionSchedule(Collection<StageExecutionAndScheduler> stages)
{
requireNonNull(stages, "stages is null");
List<PlanFragmentId> preferredScheduleOrder = getPreferredScheduleOrder(stages.stream()
.map(StageExecutionAndScheduler::getStageExecution)
.map(SqlStageExecution::getFragment)
.collect(toImmutableList()));

Ordering<SqlStageExecution> ordering = Ordering.explicit(preferredScheduleOrder)
Ordering<StageExecutionAndScheduler> ordering = Ordering.explicit(preferredScheduleOrder)
.onResultOf(PlanFragment::getId)
.onResultOf(SqlStageExecution::getFragment);
.onResultOf(execution -> execution.getStageExecution().getFragment());
schedulingStages = new LinkedHashSet<>(ordering.sortedCopy(stages));
}

@Override
public Set<SqlStageExecution> getStagesToSchedule()
public Set<StageExecutionAndScheduler> getStagesToSchedule()
{
for (Iterator<SqlStageExecution> iterator = schedulingStages.iterator(); iterator.hasNext(); ) {
StageExecutionState state = iterator.next().getState();
for (Iterator<StageExecutionAndScheduler> iterator = schedulingStages.iterator(); iterator.hasNext(); ) {
StageExecutionState state = iterator.next().getStageExecution().getState();
if (state == SCHEDULED || state == RUNNING || state.isDone()) {
iterator.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.SqlStageExecution;

import java.util.Collection;

public interface ExecutionPolicy
{
ExecutionSchedule createExecutionSchedule(Collection<SqlStageExecution> stages);
ExecutionSchedule createExecutionSchedule(Collection<StageExecutionAndScheduler> stages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.SqlStageExecution;

import java.util.Set;

public interface ExecutionSchedule
{
Set<SqlStageExecution> getStagesToSchedule();
Set<StageExecutionAndScheduler> getStagesToSchedule();

boolean isFinished();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
*/
package com.facebook.presto.execution.scheduler;

import com.facebook.presto.execution.SqlStageExecution;

import java.util.Collection;

public class PhasedExecutionPolicy
implements ExecutionPolicy
{
@Override
public ExecutionSchedule createExecutionSchedule(Collection<SqlStageExecution> stages)
public ExecutionSchedule createExecutionSchedule(Collection<StageExecutionAndScheduler> stages)
{
return new PhasedExecutionSchedule(stages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@
public class PhasedExecutionSchedule
implements ExecutionSchedule
{
private final List<Set<SqlStageExecution>> schedulePhases;
private final Set<SqlStageExecution> activeSources = new HashSet<>();
private final List<Set<StageExecutionAndScheduler>> schedulePhases;
private final Set<StageExecutionAndScheduler> activeSources = new HashSet<>();

public PhasedExecutionSchedule(Collection<SqlStageExecution> stages)
public PhasedExecutionSchedule(Collection<StageExecutionAndScheduler> stages)
{
List<Set<PlanFragmentId>> phases = extractPhases(stages.stream().map(SqlStageExecution::getFragment).collect(toImmutableList()));
List<Set<PlanFragmentId>> phases = extractPhases(stages.stream()
.map(StageExecutionAndScheduler::getStageExecution)
.map(SqlStageExecution::getFragment)
.collect(toImmutableList()));

Map<PlanFragmentId, SqlStageExecution> stagesByFragmentId = stages.stream().collect(toImmutableMap(stage -> stage.getFragment().getId(), identity()));
Map<PlanFragmentId, StageExecutionAndScheduler> stagesByFragmentId = stages.stream().collect(toImmutableMap(stage -> stage.getStageExecution().getFragment().getId(), identity()));

// create a mutable list of mutable sets of stages, so we can remove completed stages
schedulePhases = new ArrayList<>();
Expand All @@ -77,7 +80,7 @@ public PhasedExecutionSchedule(Collection<SqlStageExecution> stages)
}

@Override
public Set<SqlStageExecution> getStagesToSchedule()
public Set<StageExecutionAndScheduler> getStagesToSchedule()
{
removeCompletedStages();
addPhasesIfNecessary();
Expand All @@ -89,8 +92,8 @@ public Set<SqlStageExecution> getStagesToSchedule()

private void removeCompletedStages()
{
for (Iterator<SqlStageExecution> stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) {
StageExecutionState state = stageIterator.next().getState();
for (Iterator<StageExecutionAndScheduler> stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) {
StageExecutionState state = stageIterator.next().getStageExecution().getState();
if (state == SCHEDULED || state == RUNNING || state.isDone()) {
stageIterator.remove();
}
Expand All @@ -105,17 +108,17 @@ private void addPhasesIfNecessary()
}

while (!schedulePhases.isEmpty()) {
Set<SqlStageExecution> phase = schedulePhases.remove(0);
Set<StageExecutionAndScheduler> phase = schedulePhases.remove(0);
activeSources.addAll(phase);
if (hasSourceDistributedStage(phase)) {
return;
}
}
}

private static boolean hasSourceDistributedStage(Set<SqlStageExecution> phase)
private static boolean hasSourceDistributedStage(Set<StageExecutionAndScheduler> phase)
{
return phase.stream().anyMatch(stage -> !stage.getFragment().getTableScanSchedulingOrder().isEmpty());
return phase.stream().anyMatch(stage -> !stage.getStageExecution().getFragment().getTableScanSchedulingOrder().isEmpty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,24 +378,24 @@ private void schedule()
sectionStageExecutions.forEach(scheduledStageExecutions::addAll);
sectionStageExecutions.stream()
.map(executionInfos -> executionInfos.stream()
.map(StageExecutionAndScheduler::getStageExecution)
.collect(toImmutableList()))
.map(executionPolicy::createExecutionSchedule)
.forEach(sectionExecutionSchedules::add);

while (sectionExecutionSchedules.stream().noneMatch(ExecutionSchedule::isFinished)) {
List<ListenableFuture<?>> blockedStages = new ArrayList<>();

List<SqlStageExecution> executionsToSchedule = sectionExecutionSchedules.stream()
List<StageExecutionAndScheduler> executionsToSchedule = sectionExecutionSchedules.stream()
.flatMap(schedule -> schedule.getStagesToSchedule().stream())
.collect(toImmutableList());

for (SqlStageExecution stageExecution : executionsToSchedule) {
for (StageExecutionAndScheduler stageExecutionAndScheduler : executionsToSchedule) {
SqlStageExecution stageExecution = stageExecutionAndScheduler.getStageExecution();
StageId stageId = stageExecution.getStageExecutionId().getStageId();
stageExecution.beginScheduling();

// perform some scheduling work
ScheduleResult result = stageExecutions.get(stageId).getStageScheduler()
ScheduleResult result = stageExecutionAndScheduler.getStageScheduler()
.schedule();

// modify parent and children based on the results of the scheduling
Expand All @@ -405,7 +405,7 @@ private void schedule()
else if (!result.getBlocked().isDone()) {
blockedStages.add(result.getBlocked());
}
stageExecutions.get(stageId).getStageLinkage()
stageExecutionAndScheduler.getStageLinkage()
.processScheduleResults(stageExecution.getState(), result.getNewTasks());
schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
if (result.getBlockedReason().isPresent()) {
Expand Down

0 comments on commit b84f177

Please sign in to comment.