-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Local checkpoints #15111
Introduce Local checkpoints #15111
Conversation
Every shard group in Elasticsearch has a selected copy called a primary. When a primary shard fails a new primary would be selected from the existing replica copies. This PR introduces `primary terms` to track the number of times this has happened. This will allow us, as follow up work and among other things, to identify operations that come from old stale primaries. It is also the first step in road towards sequence numbers. Relates to elastic#10708 Closes elastic#14062
Adds a counter to each write operation on a shard. This sequence numbers is indexed into lucene using doc values, for now (we will probably require indexing to support range searchers in the future). On top of this, primary term semantics are enforced and shards will refuse write operation coming from an older primary. Other notes: - The add SequenceServiceNumber is just a skeleton and will be replaced with much heavier one, once we have all the building blocks (i.e., checkpoints). - I completely ignored recovery - for this we will need checkpoints as well. - A new based class is introduced for all single doc write operations. This is handy to unify common logic (like toXContent). - For now, we don't use seq# as versioning. We could in the future. Relates to elastic#10708 Closes elastic#14651
* this is a temporary fix until a more permanent fix is done on master * During primary relocation, some operation can be done on the source primary but reach the target primary only after the relocation is completed. At the moment the new primary will have a new primary term and as such it will reject the operations from the old one, causing data loss. This changes relocations to move the source primary to a relocated state, prevent any new operations from happening on it and waits for ongoing operations to complete. Long we term we may also conisder not incrementing the primary term on relocation.
/** | ||
* Serializes a potential null value. | ||
*/ | ||
public <T extends StreamableReader<T>> T readOptionalStreamableReader(StreamableReader<T> streamableReader) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just make SeqNoStats
implement Streamable
and just use the existing readOptionalStreamable
and writeOptionalStreamable
? That's consistent with most (all?) the existing stats objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are trying to move to use Writeable and StreamableReader which allows us to construct the object while reading from the stream and use final members. That said, I think with Java8 only changes we have something like readOptionalStreamableReader here where we pass a "factory" method which takes the stream as an input and returns an object. This can typically be a public constructor. That means we can decouple writing from reading and have Writable not inherit from StreamableReader, which will save on a use less method and a prototype. @s1monw what are your thoughts here?
public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { | ||
super(shardId, indexSettings); | ||
indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, DEFAULT_INDEX_LAG_THRESHOLD); | ||
indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, DEFAULT_INDEX_LAG_MAX_WAIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate these settings?
Change IndexShard counters for the new simplifies ReplicationAction
do { | ||
// clear the flag as we are making it free for future operations. do se before we expose it | ||
// by moving the checkpoint | ||
processedSeqNo.clear(offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just clear a range of bits with FixedBitSet#(int, int)
instead of clearing bit by bit? The implementation looks to be more efficient and would just require care around the offset wrapping.
6621b88
to
f7f4c96
Compare
f7f4c96
to
7f49c1a
Compare
@jasontedor I pushed a new approach. Can you take another look? |
closing... github made this a mess. |
This PR introduces the notion of a local checkpoint on the shard level. A local check point is defined as a the highest sequence number for which all previous operations (i.e. with a lower seq#) have been processed.
The current implementation is based on a fixed in memory bit array which is used in a round robin fashion. This introduces a limit to the spread between inflight indexing operation. We are still discussing options to work around this, but I think we should move forward toward a working system and optimize from there (and either remove this limitation or better understand it's implications).
relates to #10708