Skip to content

Commit

Permalink
Only ack cluster state updates successfully applied on all nodes (#30672
Browse files Browse the repository at this point in the history
)

The cluster state acking mechanism currently incorrectly acks cluster state updates that have not
successfully been applied on all nodes. In a situation, for example, where some of the nodes
disconnect during publishing, and don't acknowledge receiving the new cluster state, the user-facing
action (e.g. create index request) will still consider this as an ack.
  • Loading branch information
ywelsch authored May 23, 2018
1 parent 886db84 commit cceaa9a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
public interface AckedClusterStateTaskListener extends ClusterStateTaskListener {

/**
* Called to determine which nodes the acknowledgement is expected from
* Called to determine which nodes the acknowledgement is expected from.
*
* As this method will be called multiple times to determine the set of acking nodes,
* it is crucial for it to return consistent results: Given the same listener instance
* and the same node parameter, the method implementation should return the same result.
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
* @param e optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(newResponse(true));
listener.onResponse(newResponse(e == null));
}

protected abstract Response newResponse(boolean acknowledged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {

@Override
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(new ClusterStateUpdateResponse(true));
listener.onResponse(new ClusterStateUpdateResponse(e == null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private static class AckCountDownListener implements Discovery.AckListener {

private final AckedClusterStateTaskListener ackedTaskListener;
private final CountDown countDown;
private final DiscoveryNodes nodes;
private final DiscoveryNode masterNode;
private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback;
private Exception lastFailure;
Expand All @@ -572,27 +572,23 @@ private static class AckCountDownListener implements Discovery.AckListener {
ThreadPool threadPool) {
this.ackedTaskListener = ackedTaskListener;
this.clusterStateVersion = clusterStateVersion;
this.nodes = nodes;
this.masterNode = nodes.getMasterNode();
int countDown = 0;
for (DiscoveryNode node : nodes) {
if (ackedTaskListener.mustAck(node)) {
//we always wait for at least the master node
if (node.equals(masterNode) || ackedTaskListener.mustAck(node)) {
countDown++;
}
}
//we always wait for at least 1 node (the master)
countDown = Math.max(1, countDown);
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout());
}

@Override
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
if (!ackedTaskListener.mustAck(node)) {
//we always wait for the master ack anyway
if (!node.equals(nodes.getMasterNode())) {
return;
}
if (node.equals(masterNode) == false && ackedTaskListener.mustAck(node) == false) {
return;
}
if (e == null) {
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -33,8 +34,16 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Stream;

import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand All @@ -43,6 +52,11 @@
@ClusterScope(scope = TEST, minNumDataNodes = 2)
public class AckClusterUpdateSettingsIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand Down Expand Up @@ -156,4 +170,32 @@ public void testOpenIndexNoAcknowledgement() {
assertThat(openIndexResponse.isAcknowledged(), equalTo(false));
ensureGreen("test"); // make sure that recovery from disk has completed, so that check index doesn't fail.
}

public void testAckingFailsIfNotPublishedToAllNodes() {
String masterNode = internalCluster().getMasterName();
String nonMasterNode = Stream.of(internalCluster().getNodeNames())
.filter(node -> node.equals(masterNode) == false).findFirst().get();

MockTransportService masterTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
MockTransportService nonMasterTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);

logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
if (randomBoolean() && internalCluster().numMasterNodes() != 2) {
masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else {
masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
}

CreateIndexResponse response = client().admin().indices().prepareCreate("test").get();
assertFalse(response.isAcknowledged());

logger.info("waiting for cluster to reform");
masterTransportService.clearRule(nonMasterTransportService);

ensureStableCluster(internalCluster().size());

assertAcked(client().admin().indices().prepareDelete("test"));
}
}

0 comments on commit cceaa9a

Please sign in to comment.