From 89978bc9195321c896eefeb7afa7ec65a4352b04 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 10 Sep 2019 10:16:09 +0800 Subject: [PATCH 01/13] optimize rolling restart --- .../allocator/BalancedShardsAllocator.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a1549c5e217a4..e1138f27afafa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -897,6 +897,8 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s float minWeight = Float.POSITIVE_INFINITY; ModelNode minNode = null; Decision decision = null; + ModelNode delayNode = null; + Decision delayNodeDecision = null; if (throttledNodes.size() >= nodes.size() && explain == false) { // all nodes are throttled, so we know we won't be able to allocate this round, // so if we are not in explain mode, short circuit @@ -925,6 +927,11 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } + if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT + && shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) { + delayNode = node; + delayNodeDecision = currentDecision; + } if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { final boolean updateMinNode; if (currentWeight == minWeight) { @@ -964,6 +971,24 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s // decision was not set and a node was not assigned, so treat it as a NO decision decision = Decision.NO; } + + // avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING + if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) { + if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT + && shard.unassignedInfo().isDelayed()) { + if (delayNode == null) { + return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); + } else if (!minNode.getNodeId().equals(delayNode.getNodeId())) { + if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) { + minNode = delayNode; + decision = delayNodeDecision; + } else { + return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); + } + } + } + } + List nodeDecisions = null; if (explain) { nodeDecisions = new ArrayList<>(); From cac38ad2a1ca6ed509e80696f32758fb0c21bea8 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 18 Sep 2019 22:29:55 +0800 Subject: [PATCH 02/13] add test case for delay allocation with no shard data copy --- .../DelayedAllocationServiceTests.java | 91 ++++++++++++++++++- .../cluster/ESAllocationTestCase.java | 12 ++- 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java index 4ebd5eb59ad45..bffd36ab707cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java @@ -61,13 +61,15 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase { private TestDelayAllocationService delayedAllocationService; private MockAllocationService allocationService; private ClusterService clusterService; + private DelayedShardsMockGatewayAllocator gatewayAllocator; private ThreadPool threadPool; @Before public void createDelayedAllocationService() { threadPool = new TestThreadPool(getTestName()); clusterService = mock(ClusterService.class); - allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + gatewayAllocator = new DelayedShardsMockGatewayAllocator(); + allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator); delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService); verify(clusterService).addListener(delayedAllocationService); } @@ -460,6 +462,92 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos))); } + public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception { + TimeValue delaySetting = timeValueMillis(100); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting)) + .numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1")) + .build(); + final long baseTimestampNanos = System.nanoTime(); + allocationService.setNanoTimeOverride(baseTimestampNanos); + clusterState = allocationService.reroute(clusterState, "reroute"); + // starting primaries + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + // starting replicas + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); + + String nodeId = null; + final List allShards = clusterState.getRoutingTable().allShards("test"); + // we need to find the node with the replica to be removed + for (ShardRouting shardRouting : allShards) { + if (shardRouting.primary() == false) { + nodeId = shardRouting.currentNodeId(); + break; + } + } + assertNotNull(nodeId); + + // skip allocation of handling existing data copy shard + gatewayAllocator.skipAllocation(true); + + // remove node that has replica and reroute + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodes).build(); + clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute"); + ClusterState stateWithDelayedShard = clusterState; + // make sure the replica is marked as delayed (i.e. not reallocated) + assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard)); + ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next(); + assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos()); + + // mock ClusterService.submitStateUpdateTask() method + CountDownLatch latch = new CountDownLatch(1); + AtomicReference clusterStateUpdateTask = new AtomicReference<>(); + doAnswer(invocationOnMock -> { + clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]); + latch.countDown(); + return null; + }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class)); + assertNull(delayedAllocationService.delayedRerouteTask.get()); + long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos(); + long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent; + delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos); + delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState)); + + // check that delayed reroute task was created and registered with the proper settings + DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get(); + assertNotNull(delayedRerouteTask); + assertFalse(delayedRerouteTask.cancelScheduling.get()); + assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos)); + assertThat(delayedRerouteTask.nextDelay.nanos(), + equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos))); + + // check that submitStateUpdateTask() was invoked on the cluster service mock + assertTrue(latch.await(30, TimeUnit.SECONDS)); + verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get())); + + // advance the time on the allocation service to a timestamp that happened after the delayed scheduling + long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos(); + allocationService.setNanoTimeOverride(nanoTimeForReroute); + // apply cluster state + ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard); + // check that shard is not delayed anymore + assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay)); + // check that task is now removed + assertNull(delayedAllocationService.delayedRerouteTask.get()); + + // reset + gatewayAllocator.skipAllocation(false); + } + private static class TestDelayAllocationService extends DelayedAllocationService { private volatile long nanoTimeOverride = -1L; @@ -483,3 +571,4 @@ protected long currentNanoTime() { } } } + diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 2c2a63d65eac2..0f51b93350393 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -254,7 +254,10 @@ protected long currentNanoTime() { * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { - public DelayedShardsMockGatewayAllocator() {} + private boolean skipAllocation = false; + + public DelayedShardsMockGatewayAllocator() { + } @Override public void applyStartedShards(RoutingAllocation allocation, List startedShards) { @@ -268,6 +271,9 @@ public void applyFailedShards(RoutingAllocation allocation, List fa @Override public void allocateUnassigned(RoutingAllocation allocation) { + if (this.skipAllocation) { + return; + } final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); @@ -279,5 +285,9 @@ public void allocateUnassigned(RoutingAllocation allocation) { } } } + + public void skipAllocation(boolean skipAllocation) { + this.skipAllocation = skipAllocation; + } } } From e436afb19a96eb058de8dcfeb1df7b753884f42e Mon Sep 17 00:00:00 2001 From: danielhuang Date: Sat, 21 Sep 2019 00:00:57 +0800 Subject: [PATCH 03/13] fix initializing shard cancel logic issue --- .../routing/allocation/AllocationService.java | 4 +- .../allocator/BalancedShardsAllocator.java | 26 ------ .../gateway/GatewayAllocator.java | 5 +- .../DelayedAllocationServiceTests.java | 91 +------------------ .../cluster/ESAllocationTestCase.java | 12 +-- 5 files changed, 8 insertions(+), 130 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index eb139589724ae..9eec16e156413 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -396,8 +396,8 @@ private void reroute(RoutingAllocation allocation) { assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; - // now allocate all the unassigned to available nodes - if (allocation.routingNodes().unassigned().size() > 0) { + // now allocate all the unassigned to available nodes or cancel existing recoveries if we have a better match + if (allocation.routingNodes().unassigned().size() > 0 || allocation.routingNodes().hasInactiveShards()) { removeDelayMarkers(allocation); gatewayAllocator.allocateUnassigned(allocation); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index e1138f27afafa..60181afd64fef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocationDecision; @@ -897,8 +896,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s float minWeight = Float.POSITIVE_INFINITY; ModelNode minNode = null; Decision decision = null; - ModelNode delayNode = null; - Decision delayNodeDecision = null; if (throttledNodes.size() >= nodes.size() && explain == false) { // all nodes are throttled, so we know we won't be able to allocate this round, // so if we are not in explain mode, short circuit @@ -927,11 +924,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT - && shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) { - delayNode = node; - delayNodeDecision = currentDecision; - } if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { final boolean updateMinNode; if (currentWeight == minWeight) { @@ -971,24 +963,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s // decision was not set and a node was not assigned, so treat it as a NO decision decision = Decision.NO; } - - // avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING - if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) { - if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT - && shard.unassignedInfo().isDelayed()) { - if (delayNode == null) { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } else if (!minNode.getNodeId().equals(delayNode.getNodeId())) { - if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) { - minNode = delayNode; - decision = delayNodeDecision; - } else { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } - } - } - } - List nodeDecisions = null; if (explain) { nodeDecisions = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 39f64f34a94a7..a83e3b4cfd782 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -129,7 +129,10 @@ protected static void innerAllocatedUnassigned(RoutingAllocation allocation, unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering primaryShardAllocator.allocateUnassigned(allocation); - replicaShardAllocator.processExistingRecoveries(allocation); + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardAllocator.processExistingRecoveries(allocation); + } replicaShardAllocator.allocateUnassigned(allocation); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java index bffd36ab707cc..4ebd5eb59ad45 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java @@ -61,15 +61,13 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase { private TestDelayAllocationService delayedAllocationService; private MockAllocationService allocationService; private ClusterService clusterService; - private DelayedShardsMockGatewayAllocator gatewayAllocator; private ThreadPool threadPool; @Before public void createDelayedAllocationService() { threadPool = new TestThreadPool(getTestName()); clusterService = mock(ClusterService.class); - gatewayAllocator = new DelayedShardsMockGatewayAllocator(); - allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator); + allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService); verify(clusterService).addListener(delayedAllocationService); } @@ -462,92 +460,6 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos))); } - public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception { - TimeValue delaySetting = timeValueMillis(100); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting)) - .numberOfShards(1).numberOfReplicas(1)) - .build(); - ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaData) - .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); - clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1")) - .build(); - final long baseTimestampNanos = System.nanoTime(); - allocationService.setNanoTimeOverride(baseTimestampNanos); - clusterState = allocationService.reroute(clusterState, "reroute"); - // starting primaries - clusterState = startInitializingShardsAndReroute(allocationService, clusterState); - // starting replicas - clusterState = startInitializingShardsAndReroute(allocationService, clusterState); - assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); - - String nodeId = null; - final List allShards = clusterState.getRoutingTable().allShards("test"); - // we need to find the node with the replica to be removed - for (ShardRouting shardRouting : allShards) { - if (shardRouting.primary() == false) { - nodeId = shardRouting.currentNodeId(); - break; - } - } - assertNotNull(nodeId); - - // skip allocation of handling existing data copy shard - gatewayAllocator.skipAllocation(true); - - // remove node that has replica and reroute - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId); - clusterState = ClusterState.builder(clusterState).nodes(nodes).build(); - clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute"); - ClusterState stateWithDelayedShard = clusterState; - // make sure the replica is marked as delayed (i.e. not reallocated) - assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard)); - ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next(); - assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos()); - - // mock ClusterService.submitStateUpdateTask() method - CountDownLatch latch = new CountDownLatch(1); - AtomicReference clusterStateUpdateTask = new AtomicReference<>(); - doAnswer(invocationOnMock -> { - clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]); - latch.countDown(); - return null; - }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class)); - assertNull(delayedAllocationService.delayedRerouteTask.get()); - long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos(); - long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent; - delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos); - delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState)); - - // check that delayed reroute task was created and registered with the proper settings - DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get(); - assertNotNull(delayedRerouteTask); - assertFalse(delayedRerouteTask.cancelScheduling.get()); - assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos)); - assertThat(delayedRerouteTask.nextDelay.nanos(), - equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos))); - - // check that submitStateUpdateTask() was invoked on the cluster service mock - assertTrue(latch.await(30, TimeUnit.SECONDS)); - verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get())); - - // advance the time on the allocation service to a timestamp that happened after the delayed scheduling - long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos(); - allocationService.setNanoTimeOverride(nanoTimeForReroute); - // apply cluster state - ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard); - // check that shard is not delayed anymore - assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay)); - // check that task is now removed - assertNull(delayedAllocationService.delayedRerouteTask.get()); - - // reset - gatewayAllocator.skipAllocation(false); - } - private static class TestDelayAllocationService extends DelayedAllocationService { private volatile long nanoTimeOverride = -1L; @@ -571,4 +483,3 @@ protected long currentNanoTime() { } } } - diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 0f51b93350393..2c2a63d65eac2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -254,10 +254,7 @@ protected long currentNanoTime() { * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { - private boolean skipAllocation = false; - - public DelayedShardsMockGatewayAllocator() { - } + public DelayedShardsMockGatewayAllocator() {} @Override public void applyStartedShards(RoutingAllocation allocation, List startedShards) { @@ -271,9 +268,6 @@ public void applyFailedShards(RoutingAllocation allocation, List fa @Override public void allocateUnassigned(RoutingAllocation allocation) { - if (this.skipAllocation) { - return; - } final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); @@ -285,9 +279,5 @@ public void allocateUnassigned(RoutingAllocation allocation) { } } } - - public void skipAllocation(boolean skipAllocation) { - this.skipAllocation = skipAllocation; - } } } From 51d1bb3745f2b3f50f55ad9b9b7ee7a2cec4c59b Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 24 Sep 2019 20:55:26 +0800 Subject: [PATCH 04/13] add cancel new shard recovery and uses existing shard copy IT --- .../routing/allocation/AllocationService.java | 8 +- .../indices/recovery/IndexRecoveryIT.java | 83 +++++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 9eec16e156413..b238946e0f666 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -396,11 +396,9 @@ private void reroute(RoutingAllocation allocation) { assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; - // now allocate all the unassigned to available nodes or cancel existing recoveries if we have a better match - if (allocation.routingNodes().unassigned().size() > 0 || allocation.routingNodes().hasInactiveShards()) { - removeDelayMarkers(allocation); - gatewayAllocator.allocateUnassigned(allocation); - } + removeDelayMarkers(allocation); + // try to allocate existing shard copies first + gatewayAllocator.allocateUnassigned(allocation); shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 2390bae14a36a..bf57f8284a03c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -324,6 +324,89 @@ public void testReplicaRecovery() throws Exception { assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs); } + public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception { + logger.info("--> start node A"); + final String nodeA = internalCluster().startNode(); + + logger.info("--> create index on node: {}", nodeA); + ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) + .getShards()[0].getStats().getStore().size(); + + logger.info("--> start node B"); + // force a shard recovery from nodeA to nodeB + final String nodeB = internalCluster().startNode(); + Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB); + + logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB); + assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0))); + ensureGreen(INDEX_NAME); + + logger.info("--> start node C"); + final String nodeC = internalCluster().startNode(); + assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); + + // do sync flush to gen sync id + assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0))); + + logger.info("--> slowing down recoveries"); + slowDownRecovery(shardSize); + + logger.info("--> stop node B"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB)); + + logger.info("--> request recoveries"); + // nodeB down, peer recovery from nodeA to nodeC + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); + + assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, + Stage.DONE, null, nodeA); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); + + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + + logger.info("--> start node B again"); + final String nodeB1 = internalCluster().startNode(nodeBDataPathSettings); // this will use the same data location as the stopped node + assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); + + logger.info("--> request recoveries"); + // peer recovery from nodeA to node C should be canceled, replica should be allocated to nodeB that has the data copy + assertBusy(() -> { + RecoveryResponse response1 = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + List recoveryStates1 = response1.shardRecoveryStates().get(INDEX_NAME); + + List nodeARecoveryStates1 = findRecoveriesForTargetNode(nodeA, recoveryStates1); + assertThat(nodeARecoveryStates1.size(), equalTo(1)); + List nodeBRecoveryStates1 = findRecoveriesForTargetNode(nodeB1, recoveryStates1); + assertThat(nodeBRecoveryStates1.size(), equalTo(1)); + List nodeCRecoveryStates1 = findRecoveriesForTargetNode(nodeC, recoveryStates1); + assertThat(nodeCRecoveryStates1.size(), equalTo(0)); + + assertRecoveryState(nodeARecoveryStates1.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, + Stage.DONE, null, nodeA); + validateIndexRecoveryState(nodeARecoveryStates1.get(0).getIndex()); + + assertRecoveryState(nodeBRecoveryStates1.get(0), 0, PeerRecoverySource.INSTANCE, false, + Stage.DONE, nodeA, nodeB1); + validateIndexRecoveryState(nodeBRecoveryStates1.get(0).getIndex()); + }); + + logger.info("--> speeding up recoveries"); + restoreRecoverySpeed(); + + // wait for it to be finished + ensureGreen(); + } + public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); From be2546bed12437d5cf3f02c29f64174457deebbf Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 24 Sep 2019 20:57:56 +0800 Subject: [PATCH 05/13] remove extra space --- .../org/elasticsearch/indices/recovery/IndexRecoveryIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index bf57f8284a03c..4f0af91c1973f 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -379,7 +379,7 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); logger.info("--> request recoveries"); - // peer recovery from nodeA to node C should be canceled, replica should be allocated to nodeB that has the data copy + // peer recovery from nodeA to nodeC should be canceled, replica should be allocated to nodeB that has the data copy assertBusy(() -> { RecoveryResponse response1 = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); List recoveryStates1 = response1.shardRecoveryStates().get(INDEX_NAME); From 28d2e7c8d5a7d4a384a62ab8b2fe34364db62d90 Mon Sep 17 00:00:00 2001 From: howardhuanghua Date: Wed, 25 Sep 2019 11:35:02 +0800 Subject: [PATCH 06/13] fix long line issue --- .../org/elasticsearch/indices/recovery/IndexRecoveryIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 4f0af91c1973f..dc4e3d26219ba 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -375,7 +375,8 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); logger.info("--> start node B again"); - final String nodeB1 = internalCluster().startNode(nodeBDataPathSettings); // this will use the same data location as the stopped node + // this will use the same data location as the stopped node + final String nodeB1 = internalCluster().startNode(nodeBDataPathSettings); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); logger.info("--> request recoveries"); From c3e138783129cc0ac5eda014cfe17fa184490ef1 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 10 Sep 2019 10:16:09 +0800 Subject: [PATCH 07/13] optimize rolling restart --- .../allocator/BalancedShardsAllocator.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 60181afd64fef..4e13c69527863 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -896,6 +896,8 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s float minWeight = Float.POSITIVE_INFINITY; ModelNode minNode = null; Decision decision = null; + ModelNode delayNode = null; + Decision delayNodeDecision = null; if (throttledNodes.size() >= nodes.size() && explain == false) { // all nodes are throttled, so we know we won't be able to allocate this round, // so if we are not in explain mode, short circuit @@ -924,6 +926,11 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } + if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT + && shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) { + delayNode = node; + delayNodeDecision = currentDecision; + } if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { final boolean updateMinNode; if (currentWeight == minWeight) { @@ -963,6 +970,24 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s // decision was not set and a node was not assigned, so treat it as a NO decision decision = Decision.NO; } + + // avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING + if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) { + if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT + && shard.unassignedInfo().isDelayed()) { + if (delayNode == null) { + return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); + } else if (!minNode.getNodeId().equals(delayNode.getNodeId())) { + if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) { + minNode = delayNode; + decision = delayNodeDecision; + } else { + return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); + } + } + } + } + List nodeDecisions = null; if (explain) { nodeDecisions = new ArrayList<>(); From bf5248fb08e29ccc7933b5fcc8b9b1a049a63468 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 18 Sep 2019 22:29:55 +0800 Subject: [PATCH 08/13] add test case for delay allocation with no shard data copy --- .../DelayedAllocationServiceTests.java | 91 ++++++++++++++++++- .../cluster/ESAllocationTestCase.java | 12 ++- 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java index 4ebd5eb59ad45..bffd36ab707cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java @@ -61,13 +61,15 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase { private TestDelayAllocationService delayedAllocationService; private MockAllocationService allocationService; private ClusterService clusterService; + private DelayedShardsMockGatewayAllocator gatewayAllocator; private ThreadPool threadPool; @Before public void createDelayedAllocationService() { threadPool = new TestThreadPool(getTestName()); clusterService = mock(ClusterService.class); - allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + gatewayAllocator = new DelayedShardsMockGatewayAllocator(); + allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator); delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService); verify(clusterService).addListener(delayedAllocationService); } @@ -460,6 +462,92 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos))); } + public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception { + TimeValue delaySetting = timeValueMillis(100); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting)) + .numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1")) + .build(); + final long baseTimestampNanos = System.nanoTime(); + allocationService.setNanoTimeOverride(baseTimestampNanos); + clusterState = allocationService.reroute(clusterState, "reroute"); + // starting primaries + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + // starting replicas + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); + + String nodeId = null; + final List allShards = clusterState.getRoutingTable().allShards("test"); + // we need to find the node with the replica to be removed + for (ShardRouting shardRouting : allShards) { + if (shardRouting.primary() == false) { + nodeId = shardRouting.currentNodeId(); + break; + } + } + assertNotNull(nodeId); + + // skip allocation of handling existing data copy shard + gatewayAllocator.skipAllocation(true); + + // remove node that has replica and reroute + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodes).build(); + clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute"); + ClusterState stateWithDelayedShard = clusterState; + // make sure the replica is marked as delayed (i.e. not reallocated) + assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard)); + ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next(); + assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos()); + + // mock ClusterService.submitStateUpdateTask() method + CountDownLatch latch = new CountDownLatch(1); + AtomicReference clusterStateUpdateTask = new AtomicReference<>(); + doAnswer(invocationOnMock -> { + clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]); + latch.countDown(); + return null; + }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class)); + assertNull(delayedAllocationService.delayedRerouteTask.get()); + long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos(); + long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent; + delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos); + delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState)); + + // check that delayed reroute task was created and registered with the proper settings + DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get(); + assertNotNull(delayedRerouteTask); + assertFalse(delayedRerouteTask.cancelScheduling.get()); + assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos)); + assertThat(delayedRerouteTask.nextDelay.nanos(), + equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos))); + + // check that submitStateUpdateTask() was invoked on the cluster service mock + assertTrue(latch.await(30, TimeUnit.SECONDS)); + verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get())); + + // advance the time on the allocation service to a timestamp that happened after the delayed scheduling + long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos(); + allocationService.setNanoTimeOverride(nanoTimeForReroute); + // apply cluster state + ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard); + // check that shard is not delayed anymore + assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay)); + // check that task is now removed + assertNull(delayedAllocationService.delayedRerouteTask.get()); + + // reset + gatewayAllocator.skipAllocation(false); + } + private static class TestDelayAllocationService extends DelayedAllocationService { private volatile long nanoTimeOverride = -1L; @@ -483,3 +571,4 @@ protected long currentNanoTime() { } } } + diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 2c2a63d65eac2..0f51b93350393 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -254,7 +254,10 @@ protected long currentNanoTime() { * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { - public DelayedShardsMockGatewayAllocator() {} + private boolean skipAllocation = false; + + public DelayedShardsMockGatewayAllocator() { + } @Override public void applyStartedShards(RoutingAllocation allocation, List startedShards) { @@ -268,6 +271,9 @@ public void applyFailedShards(RoutingAllocation allocation, List fa @Override public void allocateUnassigned(RoutingAllocation allocation) { + if (this.skipAllocation) { + return; + } final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); @@ -279,5 +285,9 @@ public void allocateUnassigned(RoutingAllocation allocation) { } } } + + public void skipAllocation(boolean skipAllocation) { + this.skipAllocation = skipAllocation; + } } } From dc25223f35dad4bb984e5898894115d58b22c69b Mon Sep 17 00:00:00 2001 From: danielhuang Date: Sat, 21 Sep 2019 00:00:57 +0800 Subject: [PATCH 09/13] fix initializing shard cancel logic issue --- .../routing/allocation/AllocationService.java | 2 +- .../allocator/BalancedShardsAllocator.java | 25 ----- .../DelayedAllocationServiceTests.java | 91 +------------------ .../cluster/ESAllocationTestCase.java | 12 +-- 4 files changed, 3 insertions(+), 127 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b238946e0f666..fd52fee639230 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -395,7 +395,7 @@ private void reroute(RoutingAllocation allocation) { assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes"; assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster"; - + removeDelayMarkers(allocation); // try to allocate existing shard copies first gatewayAllocator.allocateUnassigned(allocation); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 4e13c69527863..60181afd64fef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -896,8 +896,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s float minWeight = Float.POSITIVE_INFINITY; ModelNode minNode = null; Decision decision = null; - ModelNode delayNode = null; - Decision delayNodeDecision = null; if (throttledNodes.size() >= nodes.size() && explain == false) { // all nodes are throttled, so we know we won't be able to allocate this round, // so if we are not in explain mode, short circuit @@ -926,11 +924,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT - && shard.unassignedInfo().getDetails().indexOf(node.getNodeId()) != -1) { - delayNode = node; - delayNodeDecision = currentDecision; - } if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { final boolean updateMinNode; if (currentWeight == minWeight) { @@ -970,24 +963,6 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s // decision was not set and a node was not assigned, so treat it as a NO decision decision = Decision.NO; } - - // avoid allocating shard to other nodes before INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING - if (decision.type() == Type.YES || decision.type() == Type.THROTTLE) { - if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT - && shard.unassignedInfo().isDelayed()) { - if (delayNode == null) { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } else if (!minNode.getNodeId().equals(delayNode.getNodeId())) { - if (delayNodeDecision.type() == Type.YES || delayNodeDecision.type() == Type.THROTTLE) { - minNode = delayNode; - decision = delayNodeDecision; - } else { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } - } - } - } - List nodeDecisions = null; if (explain) { nodeDecisions = new ArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java index bffd36ab707cc..4ebd5eb59ad45 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java @@ -61,15 +61,13 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase { private TestDelayAllocationService delayedAllocationService; private MockAllocationService allocationService; private ClusterService clusterService; - private DelayedShardsMockGatewayAllocator gatewayAllocator; private ThreadPool threadPool; @Before public void createDelayedAllocationService() { threadPool = new TestThreadPool(getTestName()); clusterService = mock(ClusterService.class); - gatewayAllocator = new DelayedShardsMockGatewayAllocator(); - allocationService = createAllocationService(Settings.EMPTY, gatewayAllocator); + allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService); verify(clusterService).addListener(delayedAllocationService); } @@ -462,92 +460,6 @@ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() thro equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos))); } - public void testDelayUnassignedNoShardCopyBeforeDelay() throws Exception { - TimeValue delaySetting = timeValueMillis(100); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting)) - .numberOfShards(1).numberOfReplicas(1)) - .build(); - ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaData) - .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); - clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).localNodeId("node1").masterNodeId("node1")) - .build(); - final long baseTimestampNanos = System.nanoTime(); - allocationService.setNanoTimeOverride(baseTimestampNanos); - clusterState = allocationService.reroute(clusterState, "reroute"); - // starting primaries - clusterState = startInitializingShardsAndReroute(allocationService, clusterState); - // starting replicas - clusterState = startInitializingShardsAndReroute(allocationService, clusterState); - assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); - - String nodeId = null; - final List allShards = clusterState.getRoutingTable().allShards("test"); - // we need to find the node with the replica to be removed - for (ShardRouting shardRouting : allShards) { - if (shardRouting.primary() == false) { - nodeId = shardRouting.currentNodeId(); - break; - } - } - assertNotNull(nodeId); - - // skip allocation of handling existing data copy shard - gatewayAllocator.skipAllocation(true); - - // remove node that has replica and reroute - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId); - clusterState = ClusterState.builder(clusterState).nodes(nodes).build(); - clusterState = allocationService.disassociateDeadNodes(clusterState, true, "reroute"); - ClusterState stateWithDelayedShard = clusterState; - // make sure the replica is marked as delayed (i.e. not reallocated) - assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard)); - ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next(); - assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos()); - - // mock ClusterService.submitStateUpdateTask() method - CountDownLatch latch = new CountDownLatch(1); - AtomicReference clusterStateUpdateTask = new AtomicReference<>(); - doAnswer(invocationOnMock -> { - clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]); - latch.countDown(); - return null; - }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class)); - assertNull(delayedAllocationService.delayedRerouteTask.get()); - long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos(); - long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent; - delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos); - delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState)); - - // check that delayed reroute task was created and registered with the proper settings - DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get(); - assertNotNull(delayedRerouteTask); - assertFalse(delayedRerouteTask.cancelScheduling.get()); - assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos)); - assertThat(delayedRerouteTask.nextDelay.nanos(), - equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos))); - - // check that submitStateUpdateTask() was invoked on the cluster service mock - assertTrue(latch.await(30, TimeUnit.SECONDS)); - verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get())); - - // advance the time on the allocation service to a timestamp that happened after the delayed scheduling - long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos(); - allocationService.setNanoTimeOverride(nanoTimeForReroute); - // apply cluster state - ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard); - // check that shard is not delayed anymore - assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay)); - // check that task is now removed - assertNull(delayedAllocationService.delayedRerouteTask.get()); - - // reset - gatewayAllocator.skipAllocation(false); - } - private static class TestDelayAllocationService extends DelayedAllocationService { private volatile long nanoTimeOverride = -1L; @@ -571,4 +483,3 @@ protected long currentNanoTime() { } } } - diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 0f51b93350393..2c2a63d65eac2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -254,10 +254,7 @@ protected long currentNanoTime() { * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { - private boolean skipAllocation = false; - - public DelayedShardsMockGatewayAllocator() { - } + public DelayedShardsMockGatewayAllocator() {} @Override public void applyStartedShards(RoutingAllocation allocation, List startedShards) { @@ -271,9 +268,6 @@ public void applyFailedShards(RoutingAllocation allocation, List fa @Override public void allocateUnassigned(RoutingAllocation allocation) { - if (this.skipAllocation) { - return; - } final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator(); while (unassignedIterator.hasNext()) { ShardRouting shard = unassignedIterator.next(); @@ -285,9 +279,5 @@ public void allocateUnassigned(RoutingAllocation allocation) { } } } - - public void skipAllocation(boolean skipAllocation) { - this.skipAllocation = skipAllocation; - } } } From 45da6a36fed9e055b77a828b7be26cf50c1f08be Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 1 Oct 2019 14:12:51 +0800 Subject: [PATCH 10/13] update test case --- .../indices/recovery/IndexRecoveryIT.java | 85 ++++++++----------- 1 file changed, 37 insertions(+), 48 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index dc4e3d26219ba..fcec57fd61b96 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -349,63 +349,52 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); // do sync flush to gen sync id - assertBusy(() -> assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0))); + assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0)); - logger.info("--> slowing down recoveries"); - slowDownRecovery(shardSize); - - logger.info("--> stop node B"); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB)); - - logger.info("--> request recoveries"); - // nodeB down, peer recovery from nodeA to nodeC - RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + // hold peer recovery on phase 2 after nodeB down + CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1); + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) { + try { + allowToCompletePhase1Latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); - List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); - assertThat(nodeARecoveryStates.size(), equalTo(1)); - List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); - assertThat(nodeCRecoveryStates.size(), equalTo(1)); + logger.info("--> restart node B"); + internalCluster().restartNode(nodeB, + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { - assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, - Stage.DONE, null, nodeA); - validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); + // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeA, nodeC); - validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - logger.info("--> start node B again"); - // this will use the same data location as the stopped node - final String nodeB1 = internalCluster().startNode(nodeBDataPathSettings); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); + assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, + Stage.DONE, null, nodeA); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - logger.info("--> request recoveries"); - // peer recovery from nodeA to nodeC should be canceled, replica should be allocated to nodeB that has the data copy - assertBusy(() -> { - RecoveryResponse response1 = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - List recoveryStates1 = response1.shardRecoveryStates().get(INDEX_NAME); - - List nodeARecoveryStates1 = findRecoveriesForTargetNode(nodeA, recoveryStates1); - assertThat(nodeARecoveryStates1.size(), equalTo(1)); - List nodeBRecoveryStates1 = findRecoveriesForTargetNode(nodeB1, recoveryStates1); - assertThat(nodeBRecoveryStates1.size(), equalTo(1)); - List nodeCRecoveryStates1 = findRecoveriesForTargetNode(nodeC, recoveryStates1); - assertThat(nodeCRecoveryStates1.size(), equalTo(0)); - - assertRecoveryState(nodeARecoveryStates1.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, - Stage.DONE, null, nodeA); - validateIndexRecoveryState(nodeARecoveryStates1.get(0).getIndex()); - - assertRecoveryState(nodeBRecoveryStates1.get(0), 0, PeerRecoverySource.INSTANCE, false, - Stage.DONE, nodeA, nodeB1); - validateIndexRecoveryState(nodeBRecoveryStates1.get(0).getIndex()); - }); + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); - logger.info("--> speeding up recoveries"); - restoreRecoverySpeed(); + return super.onNodeStopped(nodeName); + } + }); - // wait for it to be finished + // wait for peer recovering from nodeA to nodeB to be finished ensureGreen(); + allowToCompletePhase1Latch.countDown(); + transportService.clearAllRules(); } public void testRerouteRecovery() throws Exception { From 3d905089e81a576b5beee3740afbc867c396ff26 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 1 Oct 2019 14:20:53 +0800 Subject: [PATCH 11/13] fix compile issue --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 60181afd64fef..a1549c5e217a4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocationDecision; From 0cff47a6daaeada8e2e4922f45602fb9ef593388 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 1 Oct 2019 15:13:57 +0800 Subject: [PATCH 12/13] update test case to wait for recovering --- .../indices/recovery/IndexRecoveryIT.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index fcec57fd61b96..d9d9dd2a80188 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -370,22 +370,24 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) throws Exception { - - // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. - RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - - List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); - assertThat(nodeARecoveryStates.size(), equalTo(1)); - List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); - assertThat(nodeCRecoveryStates.size(), equalTo(1)); - - assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, - Stage.DONE, null, nodeA); - validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - - assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeA, nodeC); - validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + assertBusy(() -> { + // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); + + assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + true, Stage.DONE, null, nodeA); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); + + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, + false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); + }); return super.onNodeStopped(nodeName); } From f83dcb5374055b9a216331f6c24f7f35dcbf0247 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 1 Oct 2019 16:41:36 +0800 Subject: [PATCH 13/13] enhance test case --- .../indices/recovery/IndexRecoveryIT.java | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index d9d9dd2a80188..7c0a92675320a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -329,13 +330,12 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio final String nodeA = internalCluster().startNode(); logger.info("--> create index on node: {}", nodeA); - ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) + createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT) .getShards()[0].getStats().getStore().size(); logger.info("--> start node B"); // force a shard recovery from nodeA to nodeB final String nodeB = internalCluster().startNode(); - Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB); logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB); assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME) @@ -346,16 +346,17 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio logger.info("--> start node C"); final String nodeC = internalCluster().startNode(); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); // do sync flush to gen sync id assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0)); // hold peer recovery on phase 2 after nodeB down + CountDownLatch phase1ReadyBlocked = new CountDownLatch(1); CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA); transportService.addSendBehavior((connection, requestId, action, request, options) -> { if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) { + phase1ReadyBlocked.countDown(); try { allowToCompletePhase1Latch.await(); } catch (InterruptedException e) { @@ -370,33 +371,38 @@ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exceptio new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) throws Exception { - assertBusy(() -> { - // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. - RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); + phase1ReadyBlocked.await(); + // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started. + RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); - assertThat(nodeARecoveryStates.size(), equalTo(1)); - List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); - assertThat(nodeCRecoveryStates.size(), equalTo(1)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, - true, Stage.DONE, null, nodeA); - validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - - assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, - false, nodeA, nodeC); - validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); - }); + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, + false, nodeA, nodeC); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); return super.onNodeStopped(nodeName); } }); - // wait for peer recovering from nodeA to nodeB to be finished + // wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked ensureGreen(); allowToCompletePhase1Latch.countDown(); transportService.clearAllRules(); + + // make sure nodeA has primary and nodeB has replica + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(2)); + for (ShardRouting shardRouting : startedShards) { + if (shardRouting.primary()) { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeA)); + } else { + assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeB)); + } + } } public void testRerouteRecovery() throws Exception {