diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 4a4b4648776b1..9afc57309cccc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -63,6 +63,9 @@ public TransportBulkShardOperationsAction( @Override protected WritePrimaryResult shardOperationOnPrimary( final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); + } return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } @@ -134,6 +137,10 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( // replicated to replicas but with the existing primary term (not the current primary term) in order // to guarantee the consistency between the primary and replicas, and between translog and Lucene index. final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + if (logger.isTraceEnabled()) { + logger.trace("operation [{}] was processed before on following primary shard {} with existing term {}", + targetOp, primary.routingEntry(), failure.getExistingPrimaryTerm()); + } assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); if (failure.getExistingPrimaryTerm().isPresent()) { appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); @@ -156,6 +163,9 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry()); + } return shardOperationOnReplica(request, replica, logger); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index b7086ed876db7..23157c177816f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -72,6 +72,9 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; if (hasBeenProcessedBefore(index)) { + if (logger.isTraceEnabled()) { + logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin()); + } if (index.origin() == Operation.Origin.PRIMARY) { /* * The existing operation in this engine was probably assigned the term of the previous primary shard which is different diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 2dccc0e96b7a2..942413d31c109 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -444,6 +444,13 @@ public static ResumeFollowAction.Request resumeFollow(String followerIndex) { * on the follower equal the leader's; then verifies the existing pairs of (docId, seqNo) on the follower also equal the leader. */ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception { + logger.info("--> asserting <> between {} and {}", leaderIndex, followerIndex); + assertBusy(() -> { + Map> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex); + logger.info("--> docs on the follower {}", docsOnFollower); + assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex))); + }, 120, TimeUnit.SECONDS); + logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex); assertBusy(() -> { Map leaderStats = new HashMap<>(); @@ -460,13 +467,8 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f } followerStats.put(shardStat.getShardRouting().shardId().id(), shardStat.getSeqNoStats()); } - assertThat(leaderStats, equalTo(followerStats)); - }, 60, TimeUnit.SECONDS); - logger.info("--> asserting <> between {} and {}", leaderIndex, followerIndex); - assertBusy(() -> { - assertThat(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex), - equalTo(getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex))); - }, 60, TimeUnit.SECONDS); + assertThat(followerStats, equalTo(leaderStats)); + }, 120, TimeUnit.SECONDS); } private Map> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 73cc94b4703a9..440a5fbc37e1e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -44,6 +45,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG") public class FollowerFailOverIT extends CcrIntegTestCase { @Override