From 64adb5ad5bdeb68490f6664c6907bfe30cf02d78 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 24 Jan 2019 11:39:46 +0100 Subject: [PATCH] Set acking timeout to 0 on dynamic mapping update (#31140) As acking can fail for any reason (unrelated node being too slow, node disconnecting), it should not be required for acking to succeed in order for index requests with dynamic mapping updates to successfully complete. Relates to #30672 and Closes #30844 --- .../action/index/MappingUpdatedAction.java | 7 ++--- .../master/IndexingMasterFailoverIT.java | 2 +- .../cluster/routing/PrimaryAllocationIT.java | 31 +++++++++++++++++++ .../indices/state/RareClusterStateIT.java | 22 +++++++++++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 770c6bca26b2f..c34a4196bb524 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -67,7 +66,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type, throw new IllegalArgumentException("_default_ mapping should not be updated"); } return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) - .setMasterNodeTimeout(timeout).setTimeout(timeout); + .setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO); } /** @@ -84,8 +83,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat * been applied to the master node and propagated to data nodes. */ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) { - if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) { - throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]"); - } + updateMappingRequest(index, type, mappingUpdate, timeout).get(); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 2865201f0f9d0..461c92d69f444 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -66,13 +66,13 @@ protected Settings nodeSettings(int nodeOrdinal) { * This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario. */ @TestLogging("_root:DEBUG") - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844") public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable { logger.info("--> start 4 nodes, 3 master, 1 data"); final Settings sharedSettings = Settings.builder() .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly + .put(TestZenDiscovery.USE_ZEN2.getKey(), false) .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index b106944e97065..a64f509363854 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -20,8 +20,10 @@ */ import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; @@ -30,6 +32,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -91,6 +94,34 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build(); } + public void testBulkWeirdScenario() throws Exception { + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + internalCluster().startDataOnlyNodes(2); + + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); + ensureGreen(); + + BulkResponse bulkResponse = client().prepareBulk() + .add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1")) + .add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2")) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(2)); + + logger.info(Strings.toString(bulkResponse, true, true)); + + internalCluster().assertSeqNos(); + + assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L)); + assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED)); + } + private void createStaleReplicaScenario(String master) throws Exception { client().prepareIndex("test", "type1").setSource(jsonBuilder() .startObject().field("field", "value1").endObject()).get(); diff --git a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 4302549e2f1fd..a34312b847e3b 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -57,6 +58,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -397,6 +399,24 @@ public void onFailure(Exception e) { assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists())); + // index another document, this time using dynamic mappings. + // The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even + // if the dynamic mapping update is not applied on the replica yet. + ActionFuture dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute(); + + // ...and wait for second mapping to be available on master + assertBusy(() -> { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); + final IndexService indexService = indicesService.indexServiceSafe(index); + assertNotNull(indexService); + final MapperService mapperService = indexService.mapperService(); + DocumentMapper mapper = mapperService.documentMapper("type"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("field2")); + }); + + assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists())); + // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // and not just because it takes time to replicate the indexing request to the replica @@ -415,6 +435,8 @@ public void onFailure(Exception e) { assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded }); + + assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED)); } }