Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
ENI fitness (#173)
Browse files Browse the repository at this point in the history
* Add pluggable ENI fitness evaluator (#170)
* Make the number of threads in TaskScheduler configurable
  • Loading branch information
tbak authored Feb 6, 2018
1 parent 30b1874 commit 76902c7
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 124 deletions.
5 changes: 3 additions & 2 deletions fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static class HostDisablePair {
private final Map<String, Map<VMResource, Double>> maxResourcesMap;
private final Map<VMResource, Double> totalResourcesMap;
private final VMRejectLimiter vmRejectLimiter;
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, "", null, 0L, null) {
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, null, "", null, 0L, null) {
@Override
void assignResult(TaskAssignmentResult result) {
throw new UnsupportedOperationException();
Expand All @@ -88,11 +88,12 @@ void assignResult(TaskAssignmentResult result) {
private final BlockingQueue<String> unknownLeaseIdsToExpire = new LinkedBlockingQueue<>();

AssignableVMs(TaskTracker taskTracker, Action1<VirtualMachineLease> leaseRejectAction,
PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
long leaseOfferExpirySecs, int maxOffersToReject,
String attrNameToGroupMaxResources, boolean singleLeaseMode, String autoScaleByAttributeName) {
this.taskTracker = taskTracker;
vmCollection = new VMCollection(
hostname -> new AssignableVirtualMachine(vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
hostname -> new AssignableVirtualMachine(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
leaseRejectAction, leaseOfferExpirySecs, taskTracker, singleLeaseMode),
autoScaleByAttributeName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
}
}

private final PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator;
private final Map<String, VirtualMachineLease> leasesMap;
private final BlockingQueue<String> workersToUnAssign;
private final BlockingQueue<String> leasesToExpire;
Expand Down Expand Up @@ -140,17 +141,20 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
private boolean firstLeaseAdded=false;
private final List<TaskRequest> consumedResourcesToAssign = new ArrayList<>();

public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
ConcurrentMap<String, String> vmIdToHostnameMap,
ConcurrentMap<String, String> leaseIdToHostnameMap,
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
long leaseOfferExpirySecs, TaskTracker taskTracker) {
this(vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
this(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
}

public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
ConcurrentMap<String, String> vmIdToHostnameMap,
ConcurrentMap<String, String> leaseIdToHostnameMap,
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
long leaseOfferExpirySecs, TaskTracker taskTracker, boolean singleLeaseMode) {
this.preferentialNamedConsumableResourceEvaluator = preferentialNamedConsumableResourceEvaluator;
this.vmIdToHostnameMap = vmIdToHostnameMap;
this.leaseIdToHostnameMap = leaseIdToHostnameMap;
this.hostname = hostname;
Expand Down Expand Up @@ -221,7 +225,7 @@ private void addToAvailableResources(VirtualMachineLease l) {
int val0 = Integer.parseInt(val0Str);
int val1 = Integer.parseInt(val1Str);
final PreferentialNamedConsumableResourceSet crs =
new PreferentialNamedConsumableResourceSet(name, val0, val1);
new PreferentialNamedConsumableResourceSet(hostname, name, val0, val1);
final Iterator<TaskRequest> iterator = consumedResourcesToAssign.iterator();
while(iterator.hasNext()) {
TaskRequest request = iterator.next();
Expand Down Expand Up @@ -786,7 +790,7 @@ private ResAsgmntResult evalAndGetResourceAssignmentFailures(TaskRequest request
for (Map.Entry<String, PreferentialNamedConsumableResourceSet> entry : resourceSets.entrySet()) {
if (!requestedNamedResNames.isEmpty())
requestedNamedResNames.remove(entry.getKey());
final double fitness = entry.getValue().getFitness(request);
final double fitness = entry.getValue().getFitness(request, preferentialNamedConsumableResourceEvaluator);
if (fitness == 0.0) {
AssignmentFailure failure = new AssignmentFailure(VMResource.ResourceSet, 0.0, 0.0, 0.0,
"ResourceSet " + entry.getValue().getName() + " unavailable"
Expand Down Expand Up @@ -951,7 +955,7 @@ void assignResult(TaskAssignmentResult result) {
result.addPort(currPortRanges.consumeNextPort());
}
for(Map.Entry<String, PreferentialNamedConsumableResourceSet> entry: resourceSets.entrySet()) {
result.addResourceSet(entry.getValue().consume(result.getRequest()));
result.addResourceSet(entry.getValue().consume(result.getRequest(), preferentialNamedConsumableResourceEvaluator));
}
if(!taskTracker.addAssignedTask(result.getRequest(), this))
logger.error("Unexpected to re-add task to assigned state, id=" + result.getRequest().getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.netflix.fenzo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

class CompositeSchedulingEventListener implements SchedulingEventListener {

private static final Logger logger = LoggerFactory.getLogger(CompositeSchedulingEventListener.class);

private final List<SchedulingEventListener> listeners;

private CompositeSchedulingEventListener(Collection<SchedulingEventListener> listeners) {
this.listeners = new ArrayList<>(listeners);
}

@Override
public void onScheduleStart() {
safely(SchedulingEventListener::onScheduleStart);
}

@Override
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
safely(listener -> listener.onAssignment(taskAssignmentResult));
}

@Override
public void onScheduleFinish() {
safely(SchedulingEventListener::onScheduleFinish);
}

private void safely(Consumer<SchedulingEventListener> action) {
listeners.forEach(listener -> {
try {
action.accept(listener);
} catch (Exception e) {
logger.warn("Scheduling event dispatching error: {} -> {}", listener.getClass().getSimpleName(), e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("Details", e);
}
}
});
}

static SchedulingEventListener of(Collection<SchedulingEventListener> listeners) {
if (listeners.isEmpty()) {
return NoOpSchedulingEventListener.INSTANCE;
}
return new CompositeSchedulingEventListener(listeners);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.netflix.fenzo;

/**
* Default {@link PreferentialNamedConsumableResourceEvaluator} implementation.
*/
public class DefaultPreferentialNamedConsumableResourceEvaluator implements PreferentialNamedConsumableResourceEvaluator {

public static final PreferentialNamedConsumableResourceEvaluator INSTANCE = new DefaultPreferentialNamedConsumableResourceEvaluator();

@Override
public double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit) {
// unassigned: 0.0 indicates no fitness, so return 0.5, which is less than the case of assigned with 0 sub-resources
return 0.5 / (subResourcesLimit + 1);
}

@Override
public double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit) {
return Math.min(1.0, (subResourcesUsed + subResourcesNeeded + 1.0) / (subResourcesLimit + 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.fenzo;

class NoOpSchedulingEventListener implements SchedulingEventListener {

static final SchedulingEventListener INSTANCE = new NoOpSchedulingEventListener();

@Override
public void onScheduleStart() {
}

@Override
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
}

@Override
public void onScheduleFinish() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.netflix.fenzo;

import com.netflix.fenzo.PreferentialNamedConsumableResourceSet.PreferentialNamedConsumableResource;

/**
* Evaluator for {@link PreferentialNamedConsumableResource} selection process. Given an agent with matching
* ENI slot (either empty or with a matching name), this evaluator computes the fitness score.
* A custom implementation can provide fitness calculators augmented with additional information not available to
* Fenzo for making best placement decision.
*
* <h1>Example</h1>
* {@link PreferentialNamedConsumableResource} can be used to model AWS ENI interfaces together with IP and security
* group assignments. To minimize number of AWS API calls and to improve efficiency, it is beneficial to place a task
* on an agent which has ENI profile with matching security group profile so the ENI can be reused. Or if a task
* is terminated, but agent releases its resources lazily, they can be reused by another task with a matching profile.
*/
public interface PreferentialNamedConsumableResourceEvaluator {

/**
* Provide fitness score for an idle consumable resource.
*
* @param hostname hostname of an agent
* @param resourceName name to be associated with a resource with the given index
* @param index a consumable resource index
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
* @param subResourcesLimit a total amount of sub-resources available
* @return fitness score
*/
double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit);

/**
* Provide fitness score for a consumable resource that is already associated with some tasks. These tasks and
* the current one having profiles so can share the resource.
*
* @param hostname hostname of an agent
* @param resourceName name associated with a resource with the given index
* @param index a consumable resource index
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
* @param subResourcesUsed an amount of sub-resources already used by other tasks
* @param subResourcesLimit a total amount of sub-resources available
* @return fitness score
*/
double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -92,14 +90,16 @@ public double getFitness() {

public static class PreferentialNamedConsumableResource {
private final double maxFitness;
private final String hostname;
private final int index;
private final String attrName;
private String resName=null;
private final int limit;
private final Map<String, TaskRequest.NamedResourceSetRequest> usageBy;
private int usedSubResources=0;

PreferentialNamedConsumableResource(int i, String attrName, int limit) {
PreferentialNamedConsumableResource(String hostname, int i, String attrName, int limit) {
this.hostname = hostname;
this.index = i;
this.attrName = attrName;
this.limit = limit;
Expand Down Expand Up @@ -131,19 +131,41 @@ public Map<String, TaskRequest.NamedResourceSetRequest> getUsageBy() {
return usedSubResources;
}

double getFitness(TaskRequest request) {
String r = getResNameVal(attrName, request);
if(resName == null)
return 0.5 / maxFitness; // unassigned: 0.0 indicates no fitness, so return 0.5, which is less than
// the case of assigned with 0 sub-resources
if(!resName.equals(r))
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null
? null
: request.getCustomNamedResources().get(attrName);

// This particular resource type is not requested. We assign to it virtual resource name 'CustomResAbsentKey',
// and request 0 sub-resources.
if(setRequest == null) {
if(resName == null) {
return evaluator.evaluateIdle(hostname, CustomResAbsentKey, index, 0, limit);
}
if(resName.equals(CustomResAbsentKey)) {
return evaluator.evaluate(hostname, CustomResAbsentKey, index, 0, usedSubResources, limit);
}
return 0.0;
final TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null?
null : request.getCustomNamedResources().get(attrName);
double subResNeed = setRequest==null? 0.0 : setRequest.getNumSubResources();
if(usedSubResources + subResNeed > limit)
}

double subResNeed = setRequest.getNumSubResources();

// Resource not assigned yet to any task
if(resName == null) {
if(subResNeed > limit) {
return 0.0;
}
return evaluator.evaluateIdle(hostname, setRequest.getResValue(), index, subResNeed, limit);
}

// Resource assigned different name than requested
if(!resName.equals(setRequest.getResValue())) {
return 0.0;
}
if(usedSubResources + subResNeed > limit) {
return 0.0;
return Math.min(1.0, usedSubResources + subResNeed + 1.0 / maxFitness);
}
return evaluator.evaluate(hostname, setRequest.getResValue(), index, subResNeed, usedSubResources, limit);
}

void consume(TaskRequest request) {
Expand Down Expand Up @@ -190,11 +212,11 @@ boolean release(TaskRequest request) {
private final String name;
private final List<PreferentialNamedConsumableResource> usageBy;

public PreferentialNamedConsumableResourceSet(String name, int val0, int val1) {
public PreferentialNamedConsumableResourceSet(String hostname, String name, int val0, int val1) {
this.name = name;
usageBy = new ArrayList<>(val0);
for(int i=0; i<val0; i++)
usageBy.add(new PreferentialNamedConsumableResource(i, name, val1));
usageBy.add(new PreferentialNamedConsumableResource(hostname, i, name, val1));
}

public String getName() {
Expand All @@ -209,8 +231,8 @@ public String getName() {
// return false;
// }

ConsumeResult consume(TaskRequest request) {
return consumeIntl(request, false);
ConsumeResult consume(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
return consumeIntl(request, false, evaluator);
}

void assign(TaskRequest request) {
Expand All @@ -233,15 +255,15 @@ void assign(TaskRequest request) {
}

// returns 0.0 for no fitness at all, or <=1.0 for fitness
double getFitness(TaskRequest request) {
return consumeIntl(request, true).fitness;
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
return consumeIntl(request, true, evaluator).fitness;
}

private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume) {
private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume, PreferentialNamedConsumableResourceEvaluator evaluator) {
PreferentialNamedConsumableResource best = null;
double bestFitness=0.0;
for(PreferentialNamedConsumableResource r: usageBy) {
double f = r.getFitness(request);
double f = r.getFitness(request, evaluator);
if(f == 0.0)
continue;
if(bestFitness < f) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.netflix.fenzo;

/**
* A callback API providing notification about Fenzo task placement decisions during the scheduling process.
*/
public interface SchedulingEventListener {

/**
* Called before a new scheduling iteration is started.
*/
void onScheduleStart();

/**
* Called when a new task placement decision is made (a task gets resources allocated on a server).
*
* @param taskAssignmentResult task assignment result
*/
void onAssignment(TaskAssignmentResult taskAssignmentResult);

/**
* Called when the scheduling iteration completes.
*/
void onScheduleFinish();
}
Loading

0 comments on commit 76902c7

Please sign in to comment.