Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist sequence number checkpoints #18949

Merged
merged 12 commits into from
Jun 22, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.net.InetAddress;

public abstract class FieldStats<T> implements Writeable, ToXContent {

private final byte type;
private long maxDoc;
private long docCount;
Expand Down Expand Up @@ -628,4 +629,5 @@ private final static class Fields {
final static String MAX_VALUE = new String("max_value");
final static String MAX_VALUE_AS_STRING = new String("max_value_as_string");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
Expand All @@ -54,8 +55,11 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.GlobalCheckpointService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -131,12 +135,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings());

mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final long localCheckpoint = loadLocalCheckpointFromCommit(writer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe load a SeqNoStats with one method? this is probably how it's going to be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 5032fa7.

final long globalCheckpoint = loadGlobalCheckpointFromCommit(writer);
final long maxSeqNo = loadMaxSeqNoFromCommit(writer);
if (logger.isTraceEnabled()) {
logger.trace(
"recovering local checkpoint: [{}], global checkpoint [{}], max sequence number [{}]",
localCheckpoint,
globalCheckpoint,
maxSeqNo);
}
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings(), maxSeqNo, localCheckpoint, globalCheckpoint);
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -287,6 +302,35 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

private long loadLocalCheckpointFromCommit(IndexWriter writer) {
final Map<String, String> commitUserData = writer.getCommitData();
if (commitUserData.containsKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)) {
return Long.parseLong(commitUserData.get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY));
} else {
return SequenceNumbersService.NO_OPS_PERFORMED;
}
}

private long loadGlobalCheckpointFromCommit(IndexWriter writer) {
final Map<String, String> commitUserData = writer.getCommitData();
if (commitUserData.containsKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)) {
return Long.parseLong(commitUserData.get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY));
} else {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

private long loadMaxSeqNoFromCommit(IndexWriter writer) throws IOException {
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
return (long) stats.getMaxValue();
} else {
return SequenceNumbersService.NO_OPS_PERFORMED;
}
}
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -1132,13 +1176,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
Map<String, String> commitData = new HashMap<>(2);
final Map<String, String> commitData = new HashMap<>(5);

commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);

commitData.put(LocalCheckpointService.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint()));
commitData.put(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint()));

if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}

indexWriter.setCommitData(commitData);
writer.commit();
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Bits;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -108,6 +114,34 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) {
throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable");
}

@Override
public FieldStats stats(IndexReader reader) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a nocommit on this implementation, so it will be removed?
PS. Although we do this temporarily to achieve other goals, we will end up adding a different stats implementation once seq no is properly indexed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 689618b.

final List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}

long currentMin = Long.MAX_VALUE;
long currentMax = Long.MIN_VALUE;
boolean found = false;
for (int i = 0; i < leaves.size(); i++) {
final LeafReader leaf = leaves.get(i).reader();
final NumericDocValues values = leaf.getNumericDocValues(name());
if (values == null) continue;
found = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strictly speaking found needs to be in the inner loop - if all docs are deleted, we didn't find anything.

PS. That said for what we don't need to care about deletes in the current implementation - if fact, we can remove all the live docs logic (although it's strictly speaking correct from a stats perspective). I'm OK with removing live docs completely (instead of inlining the found or relying on min and max staying at their initial value).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a40112f.

final Bits bits = leaf.getLiveDocs();
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
if (bits == null || bits.get(docID)) {
final long value = values.get(docID);
currentMin = Math.min(currentMin, value);
currentMax = Math.max(currentMax, value);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant think of any place where we would want a full top level scan like this. Can we return -1 or null or throw uoe? This acts O(n^2) in a near real time system. Do we know of any other loops like this in this stats code?

Copy link
Member Author

@jasontedor jasontedor Jun 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation will not be the permanent implementation, it is temporary as we iterate in this feature branch. This method is only used when starting an engine, and is needed for recovery (either locally or from a peer).

}

return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null;
}

}

public SeqNoFieldMapper(Settings indexSettings) {
Expand All @@ -129,7 +163,7 @@ protected void parseCreateField(ParseContext context, List<Field> fields) throws

@Override
public Mapper parse(ParseContext context) throws IOException {
// _seqno added in preparse
// _seq_no added in pre-parse
return null;
}

Expand Down Expand Up @@ -157,4 +191,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
protected void doMerge(Mapper mergeWith, boolean updateAllTypes) {
// nothing to do
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {

public static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be here or in the engine? it's used for commit persistence

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed f61b571.


/**
* This map holds the last known local checkpoint for every shard copy that's active.
* All shard copies in this map participate in determining the global checkpoint
Expand All @@ -67,16 +69,20 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
final private ObjectLongMap<String> trackingLocalCheckpoint;

private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long globalCheckpoint;

public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used in tests. Can we just remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a9d5f5e and 6e8f11a.

this(shardId, indexSettings, SequenceNumbersService.UNASSIGNED_SEQ_NO);
}

public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
}


/**
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
Expand Down Expand Up @@ -241,4 +247,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) {
}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.SnapshotStatus;

import java.util.LinkedList;

Expand All @@ -32,33 +33,41 @@
*/
public class LocalCheckpointService extends AbstractIndexShardComponent {

public static String MAX_SEQ_NO = "max_seq_no";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about location of these. Also, they miss a final on them. (same goes for GLOBAL_CHECKPOINT_KEY)

Copy link
Member Author

@jasontedor jasontedor Jun 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed c851db6.

public static String LOCAL_CHECKPOINT_KEY = "local_checkpoint";

/**
* we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays
* allocating them on demand and cleaning up while completed. This setting controls the size of the arrays
*/
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024,
4, Setting.Property.IndexScope);


/**
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo}
* which marks the seqNo the fist bit in the first array corresponds to.
*/
final LinkedList<FixedBitSet> processedSeqNo;
final int bitArraysSize;
long firstProcessedSeqNo = 0;
long firstProcessedSeqNo;

/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED;
volatile long checkpoint;

/** the next available seqNo - used for seqNo generation */
volatile long nextSeqNo = 0;
volatile long nextSeqNo;

public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, only used in a single place. Just remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a5b99c6 and 6e8f11a.

this(shardId, indexSettings, SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.NO_OPS_PERFORMED);
}

public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) {
super(shardId, indexSettings);
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
processedSeqNo = new LinkedList<>();
firstProcessedSeqNo = checkpoint + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if checkpoint is set to NO_OPS_PERFORMED? I think you get a 0 which is fine, but I rather make it explicit rather than rely on NO_OPS_PERFORMED being -1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 3eb20ef.

this.nextSeqNo = maxSeqNo + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is confusing - can we some java docs on constructor saying what maxSeqNo and checkpoint represent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 3eb20ef.

this.checkpoint = checkpoint;
}

/**
Expand Down Expand Up @@ -130,19 +139,19 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
*/
private FixedBitSet getBitSetForSeqNo(long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the traces of a happy debugging session :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guilty as charged. 😄

int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize;
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize));
}
return processedSeqNo.get(bitSetOffset);
}


/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */
private int seqNoToBitSetOffset(long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final LocalCheckpointService localCheckpointService;
final GlobalCheckpointService globalCheckpointService;

public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) {
public SequenceNumbersService(
final ShardId shardId,
final IndexSettings indexSettings,
final long maxSeqNo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some java docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 3eb20ef.

final long localCheckpoint,
final long globalCheckpoint) {
super(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings);
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
}

/**
Expand Down
Loading