Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-6538. Inter Queue preemption is not happening when DRF is configured in extreme resource scale cases. #5174

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
return resource.getMemorySize() <= 0;
}

@Override
public boolean isAnyRequestedResourceZeroOrNegative(Resource available, Resource resource) {
return resource.getMemorySize() == 0;
}
@Override
public boolean isAnyMajorResourceAboveZero(Resource resource) {
return resource.getMemorySize() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,22 @@ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
return false;
}

@Override
public boolean isAnyRequestedResourceZeroOrNegative(Resource available, Resource resource) {
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation resourceInformation = available.getResourceInformation(
i);
if (resourceInformation.getValue() != 0L) {
ResourceInformation ri2 = resource.getResourceInformation(i);
if (ri2.getValue() <= 0L) {
return true;
}
}
}
return false;
}

@Override
public boolean isAnyMajorResourceAboveZero(Resource resource) {
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ public abstract float divide(
*/
public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource);

/**
* Check if resource has any available (those that have non-zero available value)
* major resource types (which are all NodeManagers included) a zero value or negative value.
*
* @param available available resource
* @param resource resource
* @return returns true if any resource is zero.
*/
public abstract boolean isAnyRequestedResourceZeroOrNegative(
Resource available, Resource resource);

/**
* Get resource <code>r</code>and normalize down using step-factor
* <code>stepFactor</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ protected void computeFixpointAllocation(Resource totGuarant,

for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
if (!rc.isAnyMajorResourceAboveZero(unassigned)) {
// Exit the loop once any of the unassigned resources reach zero, except the optional
// ones which the queue has no guarantee on. This should ensure that extreme cases
// where one of the resources are significantly larger than the other (i.e: way below 1
// vcore per GB of memory)
if (rc.isAnyRequestedResourceZeroOrNegative(totGuarant, unassigned)) {
break;
}

Expand Down Expand Up @@ -266,7 +270,6 @@ protected void computeFixpointAllocation(Resource totGuarant,
}
}


/**
* This method is visible to allow sub-classes to override the initialization
* behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,56 @@ public void testInterQueuePreemptionWithMultipleResource() throws Exception {
getAppAttemptId(2))));
}

@Test
public void testInterQueuePreemptionWithMultipleResourceUnbalancedCase() throws IOException {
/**
* Queue structure is:
*
* <pre>
* root
* / \
* a b
* </pre>
*
* Scenario: Queue B : Queue A has 90/10% capacity share.
* Almost all the resources are allocated to Queue A, but based on memory alone
* containers still can be allocated elsewhere. The app on A requests a lot more.
* The app on Queue B needs a little memory (still fits in the remaining resources),
* but needs a lot of vcores. Enough resources must be preempted from the app on A
* to allow the app on B launch.
*/
String labelsConfig = "=65536:50,true;"; // default partition
String nodesConfig = // only one node
"n1=";

String queuesConfig =
// guaranteed, max, used, pending
"root(=[65536:50 65536:50 61440:45 132096:525]);" + // root
"-a(=[2048:5 65536:50 61440:45 131072:500]);" + // a
"-b(=[63488:45 65536:50 0:0 1024:25]);"; // b


String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,61440:45,n1,,20,false);" + // app1 in a
"b\t(1,1024:25,n1,,30,false)"; // app2 in b

buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
Resource ul = Resource.newInstance(65536, 50);
when(((LeafQueue)(cs.getQueue("root.a")))
.getResourceLimitForAllUsers(any(), any(), any(), any())
).thenReturn(ul);
policy.editSchedule();

// Preemption should happen in Queue a, preempt to Queue b
verify(eventHandler, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(eventHandler, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}

@Test
public void testInterQueuePreemptionWithNaturalTerminationFactor()
throws Exception {
Expand Down