-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add global checkpoint tracking on the primary #26666
Add global checkpoint tracking on the primary #26666
Conversation
This commit adds local tracking of the global checkpoints on all shard copies when a global checkpoint tracker is operating in primary mode. With this, we relay the global checkpoint on a shard copy back to the primary shard during replication operations. This serves as another step towards adding a background sync of the global checkpoint to the shard copies.
@ywelsch I want to write more tests for this (and I'm open to any suggestions that you have) but I want to start review cycles early to keep moving. |
@@ -385,12 +395,24 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable | |||
} | |||
|
|||
/** | |||
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them | |||
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
/* | ||
* A replica should always know its own local checkpoint so this should always be a valid sequence number or the pre-6.0 local | ||
* A replica should always know its own local checkpoints so this should always be valida sequence number or the pre-6.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valida?
assert !primaryMode | ||
|| getGlobalCheckpoint() <= inSyncCheckpointStates(checkpoints, CheckpointState::getLocalCheckpoint, LongStream::min); | ||
|
||
// when in primary mode, the local knowledge of the global checkpoints on shard copies is bounded by the global checkpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", this.globalCheckpoint, globalCheckpoint, reason); | ||
this.globalCheckpoint = globalCheckpoint; | ||
if (getGlobalCheckpoint() <= globalCheckpoint) { | ||
logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", getGlobalCheckpoint(), globalCheckpoint, reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store getGlobalCheckpoint()
in a variable so that we don't have to do the map lookup twice (it's called in the line above as well, and updateGlobalCheckpointOnReplica method is called quite often)?
logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", getGlobalCheckpoint(), globalCheckpoint, reason); | ||
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
final CheckpointState cps = | ||
checkpoints.computeIfAbsent(allocationId, k -> new CheckpointState(unassignedSeqNo, unassignedSeqNo, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpoints.get(allocationId) is always non-null, so no need for the computeIfAbsent. Also this is a third map lookup.
Maybe better to write this method
final CheckpointState cps = checkpoints.get(allocationId);
if (cps.globalCheckpoint <= globalCheckpoint) {
logger.trace(...);
cps.globalCheckpoint = globalCheckpoint;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can even do code sharing with updateGlobalCheckpointForShard
@@ -50,7 +57,7 @@ | |||
*/ | |||
public class GlobalCheckpointTracker extends AbstractIndexShardComponent { | |||
|
|||
private final String allocationId; | |||
final String allocationId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some methods in this class also have allocationId as parameter, which is a bit unfortunate. Maybe rename this (I don't have a good suggestion right now)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I renamed it to shardAllocationId
. Let me know if you prefer something else.
if (lcps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && | ||
lcps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT) { | ||
lcps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
// forget all checkpoint information except for current shard (should we forget local checkpoint for current shard as well?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this comment to say "forget all checkpoint information except for global checkpoint of current shard"
routingTable = IndexShardRoutingTable.Builder.readFrom(in); | ||
} | ||
|
||
public long clusterStateVersion() { | ||
return clusterStateVersion; | ||
} | ||
|
||
public Map<String, LocalCheckpointState> getLocalCheckpoints() { | ||
public Map<String, CheckpointState> getLocalCheckpoints() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this method
@@ -2028,7 +2028,7 @@ public void testSeqNoAndCheckpoints() throws IOException { | |||
final Set<String> indexedIds = new HashSet<>(); | |||
long localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; | |||
long replicaLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; | |||
long globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; | |||
long globalCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS this can be made final
builder.addShard(primaryShard); | ||
|
||
if (primaryShard.relocating()) { | ||
// builder.addShard(primaryShard.getTargetRelocatingShard()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RoutingTable has never the relocation target shard, it's auto-included with the relocating shard. Just remove these 2 lines
* master: fix testSniffNodes to use the new error message Add check for invalid index in WildcardExpressionResolver (elastic#26409) Docs: Use single-node discovery.type for dev example Filter unsupported relation for range query builder (elastic#26620) Fix kuromoji default stoptags (elastic#26600) [Docs] Add description for missing fields in Reindex/Update/Delete By Query (elastic#26618) [Docs] Update ingest.asciidoc (elastic#26599) Better message text for ResponseException [DOCS] Remove edit link from ML node enable bwc testing fix StartRecoveryRequestTests.testSerialization Add bad_request to the rest-api-spec catch params (elastic#26539) Introduce a History UUID as a requirement for ops based recovery (elastic#26577) Add missing catch arguments to the rest api spec (elastic#26536)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
assert primaryMode; | ||
assert handoffInProgress == false; | ||
assert invariant(); | ||
updateGlobalCheckpoint(allocationId, globalCheckpoint, current -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add trace logging when updating the global checkpoint knowledge of another shard copy?
final IndicesService indicesService = | ||
internalCluster().getInstance(IndicesService.class, node.getName()); | ||
final IndexService indexService = indicesService.indexService(primaryShardRouting.index()); | ||
final IndexShard indexShard = indexService.getShardOrNull(primaryShardRouting.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shorter: indicesService.getShardOrNull(primaryShardRouting.shardId())
This commit adds local tracking of the global checkpoints on all shard copies when a global checkpoint tracker is operating in primary mode. With this, we relay the global checkpoint on a shard copy back to the primary shard during replication operations. This serves as another step towards adding a background sync of the global checkpoint to the shard copies. Relates #26666
This commit adds local tracking of the global checkpoints on all shard copies when a global checkpoint tracker is operating in primary mode. With this, we relay the global checkpoint on a shard copy back to the primary shard during replication operations. This serves as another step towards adding a background sync of the global checkpoint to the shard copies. Relates #26666
This commit reenables the BWC tests after they were disabled for backporting the change to track global checkpoints of shard copies on the primary. Relates #26666
When checking that the global checkpoint on the primary is consistent with the local checkpoints of the in-sync shards, we have to filter pre-6.0 nodes from the check or the invariant will trivially trip. This commit filters these nodes out when checking this invariant. Relates #26666
When checking that the global checkpoint on the primary is consistent with the local checkpoints of the in-sync shards, we have to filter pre-6.0 nodes from the check or the invariant will trivially trip. This commit filters these nodes out when checking this invariant. Relates #26666
This commit reenables the BWC tests after they were disabled for backporting the change to track global checkpoints of shard copies on the primary. Relates #26666
When checking that the global checkpoint on the primary is consistent with the local checkpoints of the in-sync shards, we have to filter pre-6.0 nodes from the check or the invariant will trivially trip. This commit filters these nodes out when checking this invariant. Relates #26666
This commit reenables the BWC tests after they were disabled for backporting the change to track global checkpoints of shard copies on the primary. Relates #26666
Thanks @ywelsch. Safely backported and BWC tests reenabled. |
After recovery completes from a primary, we now update the local knowledge on the primary of the global checkpoint on the recovery target. However if this occurs concurrently with a relocation, an assertion could trip that we are no longer in primary mode. As this local knowledge should only be tracked when we are in primary mode, updating this local knowledge should be done under a permit. This commit causes that to be the case. Relates #26666
After recovery completes from a primary, we now update the local knowledge on the primary of the global checkpoint on the recovery target. However if this occurs concurrently with a relocation, an assertion could trip that we are no longer in primary mode. As this local knowledge should only be tracked when we are in primary mode, updating this local knowledge should be done under a permit. This commit causes that to be the case. Relates #26666
After recovery completes from a primary, we now update the local knowledge on the primary of the global checkpoint on the recovery target. However if this occurs concurrently with a relocation, an assertion could trip that we are no longer in primary mode. As this local knowledge should only be tracked when we are in primary mode, updating this local knowledge should be done under a permit. This commit causes that to be the case. Relates #26666
This commit adds local tracking of the global checkpoints on all shard copies when a global checkpoint tracker is operating in primary mode. With this, we relay the global checkpoint on a shard copy back to the primary shard during replication operations. This serves as another step towards adding a background sync of the global checkpoint to the shard copies.
Relates #26591