Skip to content

Commit

Permalink
Coordinator primary segment assignment fix (apache#5532)
Browse files Browse the repository at this point in the history
* fix issue where assign primary assigns segments to all historical servers in cluster

* fix test

* add test to ensure primary assignment will not assign to another server while loading is in progress
  • Loading branch information
clintropolis authored and gianm committed May 16, 2018
1 parent 8b930ac commit 5bf4c30
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();

for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
Expand All @@ -48,17 +49,29 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}

// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}

return new SegmentReplicantLookup(segmentsInCluster);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}

private final Table<String, String, Integer> segmentsInCluster;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
private final Table<String, String, Integer> loadingSegments;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster, Table<String, String, Integer> loadingSegments)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}

public Map<String, Integer> getClusterTiers(String segmentId)
Expand All @@ -82,4 +95,30 @@ public int getLoadedReplicants(String segmentId, String tier)
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}

public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}

public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ private void assign(
final CoordinatorStats stats
)
{
// if primary replica already exists
if (!currentReplicants.isEmpty()) {
// if primary replica already exists or is loading
final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
if (!currentReplicants.isEmpty() || loading > 0) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
Expand Down Expand Up @@ -169,7 +170,6 @@ private ServerHolder assignPrimary(
if (targetReplicantsInTier <= 0) {
continue;
}

final String tier = entry.getKey();

final List<ServerHolder> holders = getFilteredHolders(
Expand Down Expand Up @@ -228,7 +228,7 @@ private void assignReplicas(
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
currentReplicants.getOrDefault(tier, 0),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier),
params,
createLoadQueueSizeLimitingPredicate(params),
segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
Expand Down Expand Up @@ -942,6 +943,8 @@ public void testDropServerActuallyServesSegment() throws Exception

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();

EasyMock.replay(anotherMockPeon);

DruidCluster druidCluster = new DruidCluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package io.druid.server.coordinator.rules;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
//CHECKSTYLE.OFF: Regexp
import com.metamx.common.logger.Logger;
//CHECKSTYLE.ON: Regexp
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -60,12 +60,17 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

//CHECKSTYLE.OFF: Regexp
//CHECKSTYLE.ON: Regexp

/**
*/
public class LoadRuleTest
Expand Down Expand Up @@ -190,7 +195,128 @@ public void testLoad() throws Exception
}

@Test
public void testLoadPriority() throws Exception
public void testLoadPrimaryAssignDoesNotOverAssign()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();

final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));

final DataSegment segment = createDataSegment("foo");

EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();

EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);

CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);


Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));

// ensure multiple runs don't assign primary segment again if at replication count
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment));
EasyMock.replay(loadingPeon);

DruidCluster afterLoad = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
loadingPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
CoordinatorStats statsAfterLoadPrimary = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(afterLoad)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(afterLoad))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);


Assert.assertEquals(0, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));

EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}

@Test
public void testLoadPriority()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(false).anyTimes();

Expand Down Expand Up @@ -618,4 +744,18 @@ private static LoadQueuePeon createEmptyPeon()

return mockPeon;
}

private static LoadQueuePeon createLoadingPeon(List<DataSegment> segments)
{
final Set<DataSegment> segs = ImmutableSet.copyOf(segments);
final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum();

final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes();

return mockPeon;
}
}

0 comments on commit 5bf4c30

Please sign in to comment.