Skip to content

Commit

Permalink
fix initializing shard cancel logic issue
Browse files Browse the repository at this point in the history
  • Loading branch information
howardhuanghua committed Oct 1, 2019
1 parent bf5248f commit dc25223
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

removeDelayMarkers(allocation);
// try to allocate existing shard copies first
gatewayAllocator.allocateUnassigned(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
float minWeight = Float.POSITIVE_INFINITY;
ModelNode minNode = null;
Decision decision = null;
ModelNode delayNode = null;
Decision delayNodeDecision = null;
if (throttledNodes.size() >= nodes.size() && explain == false) {
// all nodes are throttled, so we know we won't be able to allocate this round,
// so if we are not in explain mode, short circuit
Expand Down Expand Up @@ -926,11 +924,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
}
if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT
&& shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) {
delayNode = node;
delayNodeDecision = currentDecision;
}
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
final boolean updateMinNode;
if (currentWeight == minWeight) {
Expand Down Expand Up @@ -970,24 +963,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
// decision was not set and a node was not assigned, so treat it as a NO decision
decision = Decision.NO;
}

// avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING
if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) {
if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT
&& shard.unassignedInfo().isDelayed()) {
if (delayNode == null) {
return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
} else if (!minNode.getNodeId().equals(delayNode.getNodeId())) {
if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) {
minNode = delayNode;
decision = delayNodeDecision;
} else {
return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
}
}
}
}

List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase {
private TestDelayAllocationService delayedAllocationService;
private MockAllocationService allocationService;
private ClusterService clusterService;
private DelayedShardsMockGatewayAllocator gatewayAllocator;
private ThreadPool threadPool;

@Before
public void createDelayedAllocationService() {
threadPool = new TestThreadPool(getTestName());
clusterService = mock(ClusterService.class);
gatewayAllocator = new DelayedShardsMockGatewayAllocator();
allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator);
allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService);
verify(clusterService).addListener(delayedAllocationService);
}
Expand Down Expand Up @@ -462,92 +460,6 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro
equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
}

public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception {
TimeValue delaySetting = timeValueMillis(100);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1"))
.build();
final long baseTimestampNanos = System.nanoTime();
allocationService.setNanoTimeOverride(baseTimestampNanos);
clusterState = allocationService.reroute(clusterState, "reroute");
// starting primaries
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
// starting replicas
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));

String nodeId = null;
final List<ShardRouting> allShards = clusterState.getRoutingTable().allShards("test");
// we need to find the node with the replica to be removed
for (ShardRouting shardRouting : allShards) {
if (shardRouting.primary() == false) {
nodeId = shardRouting.currentNodeId();
break;
}
}
assertNotNull(nodeId);

// skip allocation of handling existing data copy shard
gatewayAllocator.skipAllocation(true);

// remove node that has replica and reroute
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId);
clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute");
ClusterState stateWithDelayedShard = clusterState;
// make sure the replica is marked as delayed (i.e. not reallocated)
assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());

// mock ClusterService.submitStateUpdateTask() method
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<ClusterStateUpdateTask> clusterStateUpdateTask = new AtomicReference<>();
doAnswer(invocationOnMock -> {
clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
latch.countDown();
return null;
}).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
assertNull(delayedAllocationService.delayedRerouteTask.get());
long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos();
long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));

// check that delayed reroute task was created and registered with the proper settings
DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(delayedRerouteTask);
assertFalse(delayedRerouteTask.cancelScheduling.get());
assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(delayedRerouteTask.nextDelay.nanos(),
equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));

// check that submitStateUpdateTask() was invoked on the cluster service mock
assertTrue(latch.await(30, TimeUnit.SECONDS));
verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get()));

// advance the time on the allocation service to a timestamp that happened after the delayed scheduling
long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos();
allocationService.setNanoTimeOverride(nanoTimeForReroute);
// apply cluster state
ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard);
// check that shard is not delayed anymore
assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay));
// check that task is now removed
assertNull(delayedAllocationService.delayedRerouteTask.get());

// reset
gatewayAllocator.skipAllocation(false);
}

private static class TestDelayAllocationService extends DelayedAllocationService {
private volatile long nanoTimeOverride = -1L;

Expand All @@ -571,4 +483,3 @@ protected long currentNanoTime() {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ protected long currentNanoTime() {
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
*/
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
private boolean skipAllocation = false;

public DelayedShardsMockGatewayAllocator() {
}
public DelayedShardsMockGatewayAllocator() {}

@Override
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
Expand All @@ -271,9 +268,6 @@ public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> fa

@Override
public void allocateUnassigned(RoutingAllocation allocation) {
if (this.skipAllocation) {
return;
}
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
Expand All @@ -285,9 +279,5 @@ public void allocateUnassigned(RoutingAllocation allocation) {
}
}
}

public void skipAllocation(boolean skipAllocation) {
this.skipAllocation = skipAllocation;
}
}
}

0 comments on commit dc25223

Please sign in to comment.