diff --git a/build.gradle b/build.gradle index cfc8401a934e0..7b1e517a8586b 100644 --- a/build.gradle +++ b/build.gradle @@ -186,7 +186,7 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ allprojects { - ext.bwc_tests_enabled = true + ext.bwc_tests_enabled = false } task verifyBwcTestsEnabled { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 516554d92e8cd..77f7ff1d4460a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -20,7 +20,9 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -55,6 +57,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -108,12 +111,26 @@ public abstract class TransportReplicationAction< protected final String transportReplicaAction; protected final String transportPrimaryAction; + private final boolean syncGlobalCheckpointAfterOperation; + protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { + this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, request, replicaRequest, executor, false); + } + + + protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, + boolean syncGlobalCheckpointAfterOperation) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); this.transportService = transportService; this.clusterService = clusterService; @@ -126,6 +143,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); this.transportOptions = transportOptions(); + + this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, @@ -150,7 +169,7 @@ protected void doExecute(Task task, Request request, ActionListener li new ReroutePhase((ReplicationTask) task, request, listener).run(); } - protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { + protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { return new ReplicasProxy(primaryTerm); } @@ -359,6 +378,17 @@ private ActionListener createResponseListener(final PrimaryShardRefere return new ActionListener() { @Override public void onResponse(Response response) { + if (syncGlobalCheckpointAfterOperation) { + try { + primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation"); + } catch (final Exception e) { + // only log non-closed exceptions + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + logger.info("post-operation global checkpoint sync failed", e); + // intentionally swallow, a missed global checkpoint sync should not fail this operation + } + } + } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); try { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 31c72108ecf65..ec3dcd94d3084 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -71,7 +71,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor); + indexNameExpressionResolver, request, replicaRequest, executor, true); } /** Syncs operation result to the translog or throws a shard not available failure */ diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index 630fe11e0a811..f806c210f0014 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -22,7 +22,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -40,6 +39,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityService; diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index a4d03929cbb57..ae5ea432855af 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -25,11 +25,15 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -82,6 +86,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false); private final IndexSettings indexSettings; - private final List indexingOperationListeners; private final List searchOperationListeners; + private final List indexingOperationListeners; private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; + private volatile AsyncGlobalCheckpointTask globalCheckpointTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -182,11 +188,12 @@ public IndexService( this.engineFactory = engineFactory; // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE this.searcherWrapper = wrapperFactory.newWrapper(this); - this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); + this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); // kick off async ops for the first shard in this index this.refreshTask = new AsyncRefreshTask(this); this.trimTranslogTask = new AsyncTrimTranslogTask(this); + this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask); + IOUtils.close( + bitsetFilterCache, + indexCache, + indexFieldData, + mapperService, + refreshTask, + fsyncTask, + trimTranslogTask, + globalCheckpointTask); } } } @@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException { } } - public synchronized IndexShard createShard(ShardRouting routing) throws IOException { - final boolean primary = routing.primary(); + public synchronized IndexShard createShard(ShardRouting routing, Consumer globalCheckpointSyncer) throws IOException { /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just @@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, indexCache, mapperService, similarityService, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, - searchOperationListeners, indexingOperationListeners); + searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); @@ -710,6 +724,44 @@ private void maybeTrimTranslog() { } } + private void maybeSyncGlobalCheckpoints() { + for (final IndexShard shard : this.shards.values()) { + if (shard.routingEntry().active() && shard.routingEntry().primary()) { + switch (shard.state()) { + case CLOSED: + case CREATED: + case RECOVERING: + case RELOCATED: + continue; + case POST_RECOVERY: + assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active"; + continue; + case STARTED: + try { + shard.acquirePrimaryOperationPermit( + ActionListener.wrap( + releasable -> { + try (Releasable ignored = releasable) { + shard.maybeSyncGlobalCheckpoint("background"); + } + }, + e -> { + if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) { + logger.info("failed to execute background global checkpoint sync", e); + } + }), + ThreadPool.Names.SAME); + } catch (final AlreadyClosedException | IndexShardClosedException e) { + // the shard was closed concurrently, continue + } + continue; + default: + throw new IllegalStateException("unknown state [" + shard.state() + "]"); + } + } + } + } + abstract static class BaseAsyncTask implements Runnable, Closeable { protected final IndexService indexService; protected final ThreadPool threadPool; @@ -877,6 +929,41 @@ public String toString() { } } + // this setting is intentionally not registered, it is only used in tests + public static final Setting GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING = + Setting.timeSetting( + "index.global_checkpoint_sync.interval", + new TimeValue(30, TimeUnit.SECONDS), + new TimeValue(0, TimeUnit.MILLISECONDS), + Property.Dynamic, + Property.IndexScope); + + /** + * Background task that syncs the global checkpoint to replicas. + */ + final class AsyncGlobalCheckpointTask extends BaseAsyncTask { + + AsyncGlobalCheckpointTask(final IndexService indexService) { + // index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests + super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); + } + + @Override + protected void runInternal() { + indexService.maybeSyncGlobalCheckpoints(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "global_checkpoint_sync"; + } + } + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } @@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } + AsyncGlobalCheckpointTask getGlobalCheckpointTask() { + return globalCheckpointTask; + } + /** * Clears the caches for the given shard id if the shard is still allocated on this node */ diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index e6d8b8e8d3ff8..60b61ccefa51c 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -19,7 +19,10 @@ package org.elasticsearch.index.seqno; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; @@ -34,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -47,7 +51,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< GlobalCheckpointSyncAction.Request, GlobalCheckpointSyncAction.Request, - ReplicationResponse> implements IndexEventListener { + ReplicationResponse> { public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync"; @@ -73,7 +77,17 @@ public GlobalCheckpointSyncAction( indexNameExpressionResolver, Request::new, Request::new, - ThreadPool.Names.SAME); + ThreadPool.Names.MANAGEMENT); + } + + public void updateGlobalCheckpointForShard(final ShardId shardId) { + execute( + new Request(shardId), + ActionListener.wrap(r -> {}, e -> { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + logger.info(shardId + " global checkpoint sync failed", e); + } + })); } @Override @@ -94,11 +108,6 @@ protected void sendReplicaRequest( } } - @Override - public void onShardInactive(final IndexShard indexShard) { - execute(new Request(indexShard.shardId())); - } - @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index 34c731edaaf8b..d2b53aac1a045 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -209,13 +209,20 @@ public int hashCode() { } } - synchronized ObjectLongMap getGlobalCheckpoints() { + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + synchronized ObjectLongMap getInSyncGlobalCheckpoints() { assert primaryMode; assert handoffInProgress == false; - final ObjectLongMap globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); - for (final Map.Entry cps : checkpoints.entrySet()) { - globalCheckpoints.put(cps.getKey(), cps.getValue().globalCheckpoint); - } + final ObjectLongMap globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size + checkpoints + .entrySet() + .stream() + .filter(e -> e.getValue().inSync) + .forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint)); return globalCheckpoints; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 760fbe0a5fc07..1c8911a0cd886 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -138,8 +138,13 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } - public ObjectLongMap getGlobalCheckpoints() { - return globalCheckpointTracker.getGlobalCheckpoints(); + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + public ObjectLongMap getInSyncGlobalCheckpoints() { + return globalCheckpointTracker.getInSyncGlobalCheckpoints(); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f4a771a3b3f4f..edd37aa5c1739 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -156,6 +156,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; @@ -197,6 +198,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; + private final Runnable globalCheckpointSyncer; + + Runnable getGlobalCheckpointSyncer() { + return globalCheckpointSyncer; + } @Nullable private RecoveryState recoveryState; @@ -233,11 +239,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ private final RefreshListeners refreshListeners; - public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, - Supplier indexSortSupplier, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, - @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, - Engine.Warmer warmer, List searchOperationListener, List listeners) throws IOException { + public IndexShard( + ShardRouting shardRouting, + IndexSettings indexSettings, + ShardPath path, + Store store, + Supplier indexSortSupplier, + IndexCache indexCache, + MapperService mapperService, + SimilarityService similarityService, + @Nullable EngineFactory engineFactory, + IndexEventListener indexEventListener, + IndexSearcherWrapper indexSearcherWrapper, + ThreadPool threadPool, + BigArrays bigArrays, + Engine.Warmer warmer, + List searchOperationListener, + List listeners, + Runnable globalCheckpointSyncer) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -257,6 +276,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP final List listenersList = new ArrayList<>(listeners); listenersList.add(internalIndexingStats); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); + this.globalCheckpointSyncer = globalCheckpointSyncer; final List searchListenersList = new ArrayList<>(searchOperationListener); searchListenersList.add(searchStats); this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger); @@ -1723,11 +1743,6 @@ public void initiateTracking(final String allocationId) { public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { verifyPrimary(); getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); - /* - * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to - * the replica; mark our self as active to force a future background sync. - */ - active.compareAndSet(false, true); } /** @@ -1748,10 +1763,44 @@ public long getGlobalCheckpoint() { return getEngine().seqNoService().getGlobalCheckpoint(); } - public ObjectLongMap getGlobalCheckpoints() { + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + public ObjectLongMap getInSyncGlobalCheckpoints() { + verifyPrimary(); + verifyNotClosed(); + return getEngine().seqNoService().getInSyncGlobalCheckpoints(); + } + + /** + * Syncs the global checkpoint to the replicas if the global checkpoint on at least one replica is behind the global checkpoint on the + * primary. + */ + public void maybeSyncGlobalCheckpoint(final String reason) { verifyPrimary(); verifyNotClosed(); - return getEngine().seqNoService().getGlobalCheckpoints(); + if (state == IndexShardState.RELOCATED) { + return; + } + // only sync if there are not operations in flight + final SeqNoStats stats = getEngine().seqNoService().stats(); + if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) { + final ObjectLongMap globalCheckpoints = getInSyncGlobalCheckpoints(); + final String allocationId = routingEntry().allocationId().getId(); + assert globalCheckpoints.containsKey(allocationId); + final long globalCheckpoint = globalCheckpoints.get(allocationId); + final boolean syncNeeded = + StreamSupport + .stream(globalCheckpoints.values().spliterator(), false) + .anyMatch(v -> v.value < globalCheckpoint); + // only sync if there is a shard lagging the primary + if (syncNeeded) { + logger.trace("syncing global checkpoint for [{}]", reason); + globalCheckpointSyncer.run(); + } + } } /** diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 3039305c42a3d..caffa1b7befda 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -375,12 +374,15 @@ public IndexService indexServiceSafe(Index index) { /** * Creates a new {@link IndexService} for the given metadata. - * @param indexMetaData the index metadata to create the index for - * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners + * + * @param indexMetaData the index metadata to create the index for + * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the + * per-index listeners * @throws ResourceAlreadyExistsException if the index already exists. */ @Override - public synchronized IndexService createIndex(IndexMetaData indexMetaData, List builtInListeners) throws IOException { + public synchronized IndexService createIndex( + final IndexMetaData indexMetaData, final List builtInListeners) throws IOException { ensureChangesAllowed(); if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) { throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]"); @@ -399,13 +401,13 @@ public void onStoreClosed(ShardId shardId) { finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); final IndexService indexService = - createIndexService( - "create index", - indexMetaData, - indicesQueryCache, - indicesFieldDataCache, - finalListeners, - indexingMemoryController); + createIndexService( + "create index", + indexMetaData, + indicesQueryCache, + indicesFieldDataCache, + finalListeners, + indexingMemoryController); boolean success = false; try { indexService.getIndexEventListener().afterIndexCreated(indexService); @@ -423,7 +425,8 @@ public void onStoreClosed(ShardId shardId) { * This creates a new IndexService without registering it */ private synchronized IndexService createIndexService(final String reason, - IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, + IndexMetaData indexMetaData, + IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException { @@ -454,7 +457,8 @@ private synchronized IndexService createIndexService(final String reason, indicesQueryCache, mapperRegistry, indicesFieldDataCache, - namedWriteableRegistry); + namedWriteableRegistry + ); } /** @@ -499,10 +503,11 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa @Override public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException { + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException { ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); - IndexShard indexShard = indexService.createShard(shardRouting); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 3c1ee5b841293..5aa8b5f3ee1b3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -118,35 +118,44 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final boolean sendRefreshMapping; private final List buildInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; + private final Consumer globalCheckpointSyncer; @Inject - public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, - ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService, + public IndicesClusterStateService(Settings settings, + IndicesService indicesService, + ClusterService clusterService, + ThreadPool threadPool, + PeerRecoveryTargetService recoveryTargetService, ShardStateAction shardStateAction, NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, - SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction, - PrimaryReplicaSyncer primaryReplicaSyncer) { + SearchService searchService, + SyncedFlushService syncedFlushService, + PeerRecoverySourceService peerRecoverySourceService, + SnapshotShardsService snapshotShardsService, + PrimaryReplicaSyncer primaryReplicaSyncer, + GlobalCheckpointSyncAction globalCheckpointSyncAction) { this(settings, (AllocatedIndices>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, - snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer); + snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard); } // for tests IndicesClusterStateService(Settings settings, AllocatedIndices> indicesService, ClusterService clusterService, - ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService, + ThreadPool threadPool, + PeerRecoveryTargetService recoveryTargetService, ShardStateAction shardStateAction, NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, - SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction, - PrimaryReplicaSyncer primaryReplicaSyncer) { + SearchService searchService, + SyncedFlushService syncedFlushService, + PeerRecoverySourceService peerRecoverySourceService, + SnapshotShardsService snapshotShardsService, + PrimaryReplicaSyncer primaryReplicaSyncer, + Consumer globalCheckpointSyncer) { super(settings); this.buildInIndexListener = Arrays.asList( @@ -154,8 +163,7 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi recoveryTargetService, searchService, syncedFlushService, - snapshotShardsService, - globalCheckpointSyncAction); + snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -164,6 +172,7 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; + this.globalCheckpointSyncer = globalCheckpointSyncer; this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); } @@ -541,7 +550,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR logger.debug("{} creating shard", shardRouting.shardId()); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), - repositoriesService, failedShardHandler); + repositoriesService, failedShardHandler, globalCheckpointSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -830,7 +839,8 @@ U createIndex(IndexMetaData indexMetaData, */ T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException; + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 38c9bcb72459f..0701d9d10e3ac 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -215,6 +215,8 @@ public void testAckedIndexing() throws Exception { } }, 30, TimeUnit.SECONDS); + assertSeqNos(); + logger.info("done validating (iteration [{}])", iter); } } finally { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 3faa2da7b4ffe..7e8949cd15fbf 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -141,7 +141,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting)); + primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -238,7 +238,7 @@ public void startPrimary() throws IOException { public IndexShard addReplica() throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = - newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting)); + newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting), () -> {}); addReplica(replica); return replica; } @@ -259,8 +259,8 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP false, ShardRoutingState.INITIALIZING, RecoverySource.PeerRecoverySource.INSTANCE); - final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, - getEngineFactory(shardRouting)); + final IndexShard newReplica = + newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java new file mode 100644 index 0000000000000..b2c828cb73f0c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; + +public class GlobalCheckpointSyncIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class)) + .collect(Collectors.toList()); + } + + public void testPostOperationGlobalCheckpointSync() throws Exception { + // set the sync interval high so it does not execute during this test + runGlobalCheckpointSyncTest(TimeValue.timeValueHours(24), client -> {}, client -> {}); + } + + /* + * This test swallows the post-operation global checkpoint syncs, and then restores the ability to send these requests at the end of the + * test so that a background sync can fire and sync the global checkpoint. + */ + public void testBackgroundGlobalCheckpointSync() throws Exception { + runGlobalCheckpointSyncTest( + TimeValue.timeValueSeconds(randomIntBetween(1, 3)), + client -> { + // prevent global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + + senderTransportService.addDelegate(receiverTransportService, + new MockTransportService.DelegateTransport(senderTransportService.original()) { + @Override + protected void sendRequest( + final Connection connection, + final long requestId, + final String action, + final TransportRequest request, + final TransportRequestOptions options) throws IOException { + if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) { + throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]"); + } else { + super.sendRequest(connection, requestId, action, request, options); + } + } + }); + } + } + }, + client -> { + // restore global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + senderTransportService.clearRule(receiverTransportService); + } + } + }); + } + + private void runGlobalCheckpointSyncTest( + final TimeValue globalCheckpointSyncInterval, + final Consumer beforeIndexing, + final Consumer afterIndexing) throws Exception { + final int numberOfReplicas = randomIntBetween(1, 4); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + prepareCreate( + "test", + Settings.builder() + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), globalCheckpointSyncInterval) + .put("index.number_of_replicas", numberOfReplicas)) + .get(); + if (randomBoolean()) { + ensureGreen(); + } + + beforeIndexing.accept(client()); + + final int numberOfDocuments = randomIntBetween(0, 256); + + final int numberOfThreads = randomIntBetween(1, 4); + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + + // start concurrent indexing threads + final List threads = new ArrayList<>(numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numberOfDocuments; j++) { + final String id = Integer.toString(index * numberOfDocuments + j); + client().prepareIndex("test", "test", id).setSource("{\"foo\": " + id + "}", XContentType.JSON).get(); + } + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + threads.add(thread); + thread.start(); + } + + // synchronize the start of the threads + barrier.await(); + + // wait for the threads to finish + barrier.await(); + + afterIndexing.accept(client()); + + assertBusy(() -> { + final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); + final IndexStats indexStats = stats.getIndex("test"); + for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { + Optional maybePrimary = + Stream.of(indexShardStats.getShards()) + .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) + .findFirst(); + if (!maybePrimary.isPresent()) { + continue; + } + final ShardStats primary = maybePrimary.get(); + final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); + for (final ShardStats shardStats : indexShardStats) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + if (seqNoStats == null) { + // the shard is initializing + continue; + } + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + } + } + }); + + for (final Thread thread : threads) { + thread.join(); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2346ba290ae4e..ed5f31f4ff3da 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -539,7 +539,7 @@ public static final IndexShard newIndexShard(IndexService indexService, IndexSha IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)); + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {}); return newShard; } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b275da702b083..c2864ea68b8e7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -136,6 +136,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.max; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -711,6 +712,66 @@ private void finish() { closeShards(indexShard); } + public void testGlobalCheckpointSync() throws IOException { + // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE); + final Settings settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, null, () -> { synced.set(true); }); + // add a replicas + recoverShardFromStore(primaryShard); + final IndexShard replicaShard = newShard(shardId, false); + recoverReplica(replicaShard, primaryShard); + final int maxSeqNo = randomIntBetween(0, 128); + for (int i = 0; i < maxSeqNo; i++) { + primaryShard.getEngine().seqNoService().generateSeqNo(); + } + final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo; + + // set up local checkpoints on the shard copies + primaryShard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint); + final int replicaLocalCheckpoint = randomIntBetween(0, Math.toIntExact(checkpoint)); + final String replicaAllocationId = replicaShard.routingEntry().allocationId().getId(); + primaryShard.updateLocalCheckpointForShard(replicaAllocationId, replicaLocalCheckpoint); + + // initialize the local knowledge on the primary of the global checkpoint on the replica shards + final int replicaGlobalCheckpoint = Math.toIntExact(primaryShard.getGlobalCheckpoint()); + primaryShard.updateGlobalCheckpointForShard( + replicaAllocationId, + randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), replicaGlobalCheckpoint)); + + // simulate a background maybe sync; it should only run if the knowledge on the replica of the global checkpoint lags the primary + primaryShard.maybeSyncGlobalCheckpoint("test"); + assertThat( + synced.get(), + equalTo(maxSeqNo == primaryShard.getGlobalCheckpoint() && (replicaGlobalCheckpoint < checkpoint))); + + // simulate that the background sync advanced the global checkpoint on the replica + primaryShard.updateGlobalCheckpointForShard(replicaAllocationId, primaryShard.getGlobalCheckpoint()); + + // reset our boolean so that we can assert after another simulated maybe sync + synced.set(false); + + primaryShard.maybeSyncGlobalCheckpoint("test"); + + // this time there should not be a sync since all the replica copies are caught up with the primary + assertFalse(synced.get()); + + closeShards(replicaShard, primaryShard); + } + public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -1678,7 +1739,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); recoverShardFromStore(newShard); @@ -1824,7 +1885,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); recoverShardFromStore(newShard); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0dc760d63bfe7..787c6c815dc52 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -130,7 +130,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting); + IndexShard shard = index.createShard(newRouting, s -> {}); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 208e7443c7daf..35bbc497838f2 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -226,7 +226,8 @@ public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recov PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException { + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException { failRandomly(); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); MockIndexShard indexShard = indexService.createShard(shardRouting); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index a356693213f35..bc5a5b95b958a 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -410,20 +410,20 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod final ShardStateAction shardStateAction = mock(ShardStateAction.class); final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class); return new IndicesClusterStateService( - settings, - indicesService, - clusterService, - threadPool, - recoveryTargetService, - shardStateAction, - null, - repositoriesService, - null, - null, - null, - null, - null, - primaryReplicaSyncer); + settings, + indicesService, + clusterService, + threadPool, + recoveryTargetService, + shardStateAction, + null, + repositoriesService, + null, + null, + null, + null, + primaryReplicaSyncer, + s -> {}); } private class RecordingIndicesService extends MockIndicesService { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 04e1a846bd64f..1c7032fa02e87 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -20,22 +20,16 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntHashSet; -import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -47,13 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; @@ -63,6 +54,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -81,7 +73,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -104,48 +95,13 @@ public class RelocationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); + return Arrays.asList(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); } @Override protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); - assertBusy(() -> { - IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - for (IndexStats indexStats : stats.getIndices().values()) { - for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (maybePrimary.isPresent() == false) { - continue; - } - ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - final ShardRouting primaryShardRouting = primary.getShardRouting(); - assertThat(primaryShardRouting + " should have set the global checkpoint", - primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); - final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); - final IndicesService indicesService = - internalCluster().getInstance(IndicesService.class, node.getName()); - final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); - final ObjectLongMap globalCheckpoints = indexShard.getGlobalCheckpoints(); - for (ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); - assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); - assertThat(shardStats.getShardRouting() + " max seq no mismatch", - seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); - // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard - assertThat( - seqNoStats.getGlobalCheckpoint(), - equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); - } - } - } - }); + assertSeqNos(); } public void testSimpleRelocationNoIndexing() { @@ -301,11 +257,14 @@ public void testRelocationWhileRefreshing() throws Exception { nodes[0] = internalCluster().startNode(); logger.info("--> creating test index ..."); - prepareCreate("test", Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put("index.refresh_interval", -1) // we want to control refreshes c - ).get(); + prepareCreate( + "test", + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put("index.refresh_interval", -1) // we want to control refreshes + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")) + .get(); for (int i = 1; i < numberOfNodes; i++) { logger.info("--> starting [node_{}] ...", i); @@ -383,9 +342,6 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt } - // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down - client().admin().indices().prepareRefresh("test").get(); - } public void testCancellationCleansTempFiles() throws Exception { @@ -481,11 +437,12 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr logger.info("red nodes: {}", (Object)redNodes); ensureStableCluster(halfNodes * 2); - assertAcked(prepareCreate("test", Settings.builder() - .put("index.routing.allocation.exclude.color", "blue") - .put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) - )); + final Settings.Builder settings = Settings.builder() + .put("index.routing.allocation.exclude.color", "blue") + .put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + assertAcked(prepareCreate("test", settings)); assertAllShardsOnNodes("test", redNodes); int numDocs = randomIntBetween(100, 150); ArrayList ids = new ArrayList<>(); @@ -526,8 +483,6 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()])); } - // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down - client().admin().indices().prepareRefresh("test").get(); } class RecoveryCorruption extends MockTransportService.DelegateTransport { diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 28594301949df..b5076868f04f0 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -213,18 +213,17 @@ public void testSeqNoCheckpoints() throws Exception { final int numberOfInitialDocs = 1 + randomInt(5); logger.info("indexing [{}] docs initially", numberOfInitialDocs); numDocs += indexDocs(index, numDocs, numberOfInitialDocs); - assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync assertSeqNoOnShards(index, nodes, numDocs, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); for (final String bwcName : bwcNamesList) { assertCount(index, "_only_nodes:" + bwcName, numDocs); } final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5); logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes); numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes); - assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync assertSeqNoOnShards(index, nodes, numDocs, newNodeClient); Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); @@ -235,7 +234,6 @@ public void testSeqNoCheckpoints() throws Exception { logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); numDocs += numberOfDocsAfterMovingPrimary; - assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync assertSeqNoOnShards(index, nodes, numDocs, newNodeClient); /* * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 71fb310fa2cd5..d463fdbd17bdd 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -160,7 +160,9 @@ protected IndexShard newShard(boolean primary) throws IOException { * @param shardRouting the {@link ShardRouting} to use for this shard * @param listeners an optional set of listeners to add to the shard */ - protected IndexShard newShard(ShardRouting shardRouting, IndexingOperationListener... listeners) throws IOException { + protected IndexShard newShard( + final ShardRouting shardRouting, + final IndexingOperationListener... listeners) throws IOException { assert shardRouting.initializing() : shardRouting; Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) @@ -197,9 +199,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperatio */ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { - ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, - primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, null); + return newShard(shardId, primary, nodeId, indexMetaData, searcherWrapper, () -> {}); } /** @@ -211,11 +211,10 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I * (ready to recover from another shard) */ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData, - Runnable globalCheckpointSyncer, - @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { + @Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, null); + return newShard(shardRouting, indexMetaData, searcherWrapper, null, globalCheckpointSyncer); } @@ -229,40 +228,45 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, null, listeners); + return newShard(routing, indexMetaData, null, null, () -> {}, listeners); } /** * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. - * @param routing shard routing to use + * @param routing shard routing to use * @param indexMetaData indexMetaData for the shard, including any mapping * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param globalCheckpointSyncer callback for syncing global checkpoints * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, + Runnable globalCheckpointSyncer, IndexingOperationListener... listeners) throws IOException { // add node id as name to settings for proper logging final ShardId shardId = routing.shardId(); final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, listeners); + return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners); } /** * creates a new initializing shard. - * @param routing shard routing to use - * @param shardPath path to use for shard data - * @param indexMetaData indexMetaData for the shard, including any mapping - * @param indexSearcherWrapper an optional wrapper to be used during searchers - * @param listeners an optional set of listeners to add to the shard + * + * @param routing shard routing to use + * @param shardPath path to use for shard data + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param globalCheckpointSyncer callback for syncing global checkpoints + * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, + Runnable globalCheckpointSyncer, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); @@ -279,9 +283,9 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe }; final Engine.Warmer warmer = searcher -> { }; - indexShard = new IndexShard(routing, indexSettings, shardPath, store, () ->null, indexCache, mapperService, similarityService, + indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService, engineFactory, indexEventListener, indexSearcherWrapper, threadPool, - BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners)); + BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer); success = true; } finally { if (success == false) { @@ -311,7 +315,14 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { closeShards(current); - return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, current.engineFactory, listeners); + return newShard( + routing, + current.shardPath(), + current.indexSettings().getIndexMetaData(), + null, + current.engineFactory, + current.getGlobalCheckpointSyncer(), + listeners); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 2753e4013c181..ffa3cd6bed082 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.test; +import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; @@ -49,6 +50,10 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -69,6 +74,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -114,6 +120,9 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -161,6 +170,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -191,6 +201,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; @@ -2194,4 +2205,44 @@ public static Index resolveIndex(String index) { String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID); return new Index(index, uuid); } + + protected void assertSeqNos() throws Exception { + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); + for (IndexStats indexStats : stats.getIndices().values()) { + for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { + Optional maybePrimary = Stream.of(indexShardStats.getShards()) + .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) + .findFirst(); + if (maybePrimary.isPresent() == false) { + continue; + } + ShardStats primary = maybePrimary.get(); + final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); + final ShardRouting primaryShardRouting = primary.getShardRouting(); + assertThat(primaryShardRouting + " should have set the global checkpoint", + primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); + final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); + final IndicesService indicesService = + internalCluster().getInstance(IndicesService.class, node.getName()); + final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); + final ObjectLongMap globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); + for (ShardStats shardStats : indexShardStats) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", + seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); + assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", + seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + assertThat(shardStats.getShardRouting() + " max seq no mismatch", + seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); + // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard + assertThat( + seqNoStats.getGlobalCheckpoint(), + equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); + } + } + } + }); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index 12920a5f1504e..e1c555b811064 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; @@ -44,7 +45,12 @@ public final class InternalSettingsPlugin extends Plugin { @Override public List> getSettings() { - return Arrays.asList(VERSION_CREATED, MERGE_ENABLED, - INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING); + return Arrays.asList( + VERSION_CREATED, + MERGE_ENABLED, + INDEX_CREATION_DATE_SETTING, + PROVIDED_NAME_SETTING, + TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, + IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING); } }