Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Jan 30, 2019
1 parent 0895a5c commit 72e2e7b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private volatile long operationPrimaryTerm;

/**
* Returns the current operation primary term.
*
* @return the primary term
*/
public long getOperationPrimaryTerm() {
return operationPrimaryTerm;
}

/**
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
*
* @param operationPrimaryTerm the new operation primary term
*/
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
this.operationPrimaryTerm = operationPrimaryTerm;
}

/**
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
Expand Down Expand Up @@ -434,6 +415,25 @@ public boolean isPrimaryMode() {
return primaryMode;
}

/**
* Returns the current operation primary term.
*
* @return the primary term
*/
public long getOperationPrimaryTerm() {
return operationPrimaryTerm;
}

/**
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
*
* @param operationPrimaryTerm the new operation primary term
*/
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
this.operationPrimaryTerm = operationPrimaryTerm;
}

/**
* Returns whether the replication tracker has relocated away to another shard copy.
*/
Expand Down Expand Up @@ -553,13 +553,15 @@ private static long inSyncCheckpointStates(
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param operationPrimaryTerm the current primary term
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long operationPrimaryTerm,
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
Expand All @@ -568,6 +570,7 @@ public ReplicationTracker(
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
this.primaryMode = false;
this.operationPrimaryTerm = operationPrimaryTerm;
this.handoffInProgress = false;
this.appliedClusterStateVersion = -1L;
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,16 @@ public IndexShard(
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
final ReplicationTracker replicationTracker =
new ReplicationTracker(
shardId,
aId,
indexSettings,
primaryTerm,
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
Expand All @@ -337,9 +340,6 @@ public boolean shouldCache(Query query) {
}
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
searcherWrapper = indexSearcherWrapper;
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
replicationTracker.setOperationPrimaryTerm(primaryTerm);
refreshListeners = buildRefreshListeners();
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() {
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
Expand Down Expand Up @@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
Expand Down Expand Up @@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) {
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
Expand Down Expand Up @@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ReplicationTracker newTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint,
currentTimeMillisSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,16 @@ public void testPrimaryContextHandoff() throws IOException {
final ShardId shardId = new ShardId("test", "_na_", 0);

FakeClusterState clusterState = initialState();
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
final AllocationId aId = clusterState.routingTable.primaryShard().allocationId();
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
final long primaryTerm = randomNonNegativeLong();
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease =
(leases, listener) -> {};
ReplicationTracker oldPrimary = new ReplicationTracker(
shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
ReplicationTracker newPrimary = new ReplicationTracker(
shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);

Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ public EngineConfig config(
shardId,
allocationId.getId(),
indexSettings,
randomNonNegativeLong(),
SequenceNumbers.NO_OPS_PERFORMED,
update -> {},
() -> 0L,
Expand Down

0 comments on commit 72e2e7b

Please sign in to comment.