-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persist sequence number checkpoints #18949
Changes from 1 commit
9799d71
689618b
f61b571
c851db6
a9d5f5e
a5b99c6
6e8f11a
5032fa7
a40112f
3eb20ef
331fceb
f223f19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,13 @@ | |
import org.apache.lucene.document.Field; | ||
import org.apache.lucene.document.NumericDocValuesField; | ||
import org.apache.lucene.index.DocValuesType; | ||
import org.apache.lucene.index.IndexReader; | ||
import org.apache.lucene.index.LeafReader; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.NumericDocValues; | ||
import org.apache.lucene.search.Query; | ||
import org.apache.lucene.util.Bits; | ||
import org.elasticsearch.action.fieldstats.FieldStats; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
|
@@ -108,6 +114,34 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) { | |
throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable"); | ||
} | ||
|
||
@Override | ||
public FieldStats stats(IndexReader reader) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add a nocommit on this implementation, so it will be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 689618b. |
||
final List<LeafReaderContext> leaves = reader.leaves(); | ||
if (leaves.isEmpty()) { | ||
return null; | ||
} | ||
|
||
long currentMin = Long.MAX_VALUE; | ||
long currentMax = Long.MIN_VALUE; | ||
boolean found = false; | ||
for (int i = 0; i < leaves.size(); i++) { | ||
final LeafReader leaf = leaves.get(i).reader(); | ||
final NumericDocValues values = leaf.getNumericDocValues(name()); | ||
if (values == null) continue; | ||
found = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. strictly speaking found needs to be in the inner loop - if all docs are deleted, we didn't find anything. PS. That said for what we don't need to care about deletes in the current implementation - if fact, we can remove all the live docs logic (although it's strictly speaking correct from a stats perspective). I'm OK with removing live docs completely (instead of inlining the found or relying on min and max staying at their initial value). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed a40112f. |
||
final Bits bits = leaf.getLiveDocs(); | ||
for (int docID = 0; docID < leaf.maxDoc(); docID++) { | ||
if (bits == null || bits.get(docID)) { | ||
final long value = values.get(docID); | ||
currentMin = Math.min(currentMin, value); | ||
currentMax = Math.max(currentMax, value); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cant think of any place where we would want a full top level scan like this. Can we return -1 or null or throw uoe? This acts O(n^2) in a near real time system. Do we know of any other loops like this in this stats code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation will not be the permanent implementation, it is temporary as we iterate in this feature branch. This method is only used when starting an engine, and is needed for recovery (either locally or from a peer). |
||
} | ||
|
||
return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null; | ||
} | ||
|
||
} | ||
|
||
public SeqNoFieldMapper(Settings indexSettings) { | ||
|
@@ -129,7 +163,7 @@ protected void parseCreateField(ParseContext context, List<Field> fields) throws | |
|
||
@Override | ||
public Mapper parse(ParseContext context) throws IOException { | ||
// _seqno added in preparse | ||
// _seq_no added in pre-parse | ||
return null; | ||
} | ||
|
||
|
@@ -157,4 +191,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
protected void doMerge(Mapper mergeWith, boolean updateAllTypes) { | ||
// nothing to do | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,8 @@ | |
*/ | ||
public class GlobalCheckpointService extends AbstractIndexShardComponent { | ||
|
||
public static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be here or in the engine? it's used for commit persistence There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed f61b571. |
||
|
||
/** | ||
* This map holds the last known local checkpoint for every shard copy that's active. | ||
* All shard copies in this map participate in determining the global checkpoint | ||
|
@@ -67,16 +69,20 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { | |
*/ | ||
final private ObjectLongMap<String> trackingLocalCheckpoint; | ||
|
||
private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
private long globalCheckpoint; | ||
|
||
public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is only used in tests. Can we just remove it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
this(shardId, indexSettings, SequenceNumbersService.UNASSIGNED_SEQ_NO); | ||
} | ||
|
||
public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) { | ||
public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { | ||
super(shardId, indexSettings); | ||
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); | ||
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); | ||
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); | ||
this.globalCheckpoint = globalCheckpoint; | ||
} | ||
|
||
|
||
/** | ||
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one, | ||
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late | ||
|
@@ -241,4 +247,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) { | |
} | ||
return SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.shard.AbstractIndexShardComponent; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.shard.SnapshotStatus; | ||
|
||
import java.util.LinkedList; | ||
|
||
|
@@ -32,33 +33,41 @@ | |
*/ | ||
public class LocalCheckpointService extends AbstractIndexShardComponent { | ||
|
||
public static String MAX_SEQ_NO = "max_seq_no"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about location of these. Also, they miss a final on them. (same goes for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed c851db6. |
||
public static String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; | ||
|
||
/** | ||
* we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays | ||
* allocating them on demand and cleaning up while completed. This setting controls the size of the arrays | ||
*/ | ||
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, | ||
4, Setting.Property.IndexScope); | ||
|
||
|
||
/** | ||
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} | ||
* which marks the seqNo the fist bit in the first array corresponds to. | ||
*/ | ||
final LinkedList<FixedBitSet> processedSeqNo; | ||
final int bitArraysSize; | ||
long firstProcessedSeqNo = 0; | ||
long firstProcessedSeqNo; | ||
|
||
/** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ | ||
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED; | ||
volatile long checkpoint; | ||
|
||
/** the next available seqNo - used for seqNo generation */ | ||
volatile long nextSeqNo = 0; | ||
volatile long nextSeqNo; | ||
|
||
public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, only used in a single place. Just remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
this(shardId, indexSettings, SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.NO_OPS_PERFORMED); | ||
} | ||
|
||
public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { | ||
public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { | ||
super(shardId, indexSettings); | ||
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); | ||
processedSeqNo = new LinkedList<>(); | ||
firstProcessedSeqNo = checkpoint + 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if checkpoint is set to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 3eb20ef. |
||
this.nextSeqNo = maxSeqNo + 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is confusing - can we some java docs on constructor saying what maxSeqNo and checkpoint represent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 3eb20ef. |
||
this.checkpoint = checkpoint; | ||
} | ||
|
||
/** | ||
|
@@ -130,19 +139,19 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) | |
*/ | ||
private FixedBitSet getBitSetForSeqNo(long seqNo) { | ||
assert Thread.holdsLock(this); | ||
assert seqNo >= firstProcessedSeqNo; | ||
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the traces of a happy debugging session :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guilty as charged. 😄 |
||
int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; | ||
while (bitSetOffset >= processedSeqNo.size()) { | ||
processedSeqNo.add(new FixedBitSet(bitArraysSize)); | ||
} | ||
return processedSeqNo.get(bitSetOffset); | ||
} | ||
|
||
|
||
/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ | ||
private int seqNoToBitSetOffset(long seqNo) { | ||
assert Thread.holdsLock(this); | ||
assert seqNo >= firstProcessedSeqNo; | ||
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,10 +34,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { | |
final LocalCheckpointService localCheckpointService; | ||
final GlobalCheckpointService globalCheckpointService; | ||
|
||
public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { | ||
public SequenceNumbersService( | ||
final ShardId shardId, | ||
final IndexSettings indexSettings, | ||
final long maxSeqNo, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add some java docs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 3eb20ef. |
||
final long localCheckpoint, | ||
final long globalCheckpoint) { | ||
super(shardId, indexSettings); | ||
localCheckpointService = new LocalCheckpointService(shardId, indexSettings); | ||
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings); | ||
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint); | ||
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe load a
SeqNoStats
with one method? this is probably how it's going to be?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 5032fa7.