-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use soft-deleted docs to resolve strategy for engine operation #35230
Changes from 11 commits
97815bd
92dde70
436cbeb
28a0a13
9ac1c39
a5e1fed
b4f858c
664ef55
1207822
ca67925
315c44a
b55e9cf
4260990
7646122
26993a3
33675c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,16 +140,34 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { | |
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException { | ||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : | ||
"context's reader is not the same as the reader class was initialized on."; | ||
int docID = getDocID(id, context.reader().getLiveDocs()); | ||
if (docID != DocIdSetIterator.NO_MORE_DOCS) { | ||
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); | ||
long seqNo; | ||
if (seqNos != null && seqNos.advanceExact(docID)) { | ||
seqNo = seqNos.longValue(); | ||
} else { | ||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
// termsEnum can possibly be null here if this leaf contains only no-ops. | ||
if (termsEnum != null && termsEnum.seekExact(id)) { | ||
docsEnum = termsEnum.postings(docsEnum, 0); | ||
final Bits liveDocs = context.reader().getLiveDocs(); | ||
DocIdAndSeqNo result = null; | ||
int docID = docsEnum.nextDoc(); | ||
if (docID != DocIdSetIterator.NO_MORE_DOCS) { | ||
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); | ||
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) { | ||
final long seqNo; | ||
if (seqNoDV != null && seqNoDV.advanceExact(docID)) { | ||
seqNo = seqNoDV.longValue(); | ||
} else { | ||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
} | ||
final boolean isDeleted = (liveDocs != null && liveDocs.get(docID) == false); | ||
if (result == null || result.seqNo < seqNo) { | ||
assert result == null || result.isDeleted : "the live doc doesn't have the highest seq_no; " | ||
+ result.seqNo + " < " + seqNo + " id=" + id; | ||
result = new DocIdAndSeqNo(docID, seqNo, context, isDeleted); | ||
} else if (result.seqNo == seqNo && (result.isDeleted || isDeleted == false)) { | ||
// the extra guard "result.isDeleted || isDeleted == false" is to make sure that we pick a live doc instead of | ||
// a deleted doc in case the same index operation was indexed twice - one as live and another as soft-deleted. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when can this happen exactly? |
||
result = new DocIdAndSeqNo(docID, seqNo, context, isDeleted); | ||
} | ||
} | ||
} | ||
return new DocIdAndSeqNo(docID, seqNo, context); | ||
return result; | ||
} else { | ||
return null; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,11 +114,13 @@ public static class DocIdAndSeqNo { | |
public final int docId; | ||
public final long seqNo; | ||
public final LeafReaderContext context; | ||
public final boolean isDeleted; | ||
|
||
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) { | ||
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isDeleted) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call it |
||
this.docId = docId; | ||
this.seqNo = seqNo; | ||
this.context = context; | ||
this.isDeleted = isDeleted; | ||
} | ||
} | ||
|
||
|
@@ -146,25 +148,32 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) | |
} | ||
|
||
/** | ||
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul> | ||
* <li>null if the uid wasn't found, | ||
* <li>a doc ID and the associated seqNo otherwise | ||
* </ul> | ||
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader. | ||
* The flag {@link DocIdAndSeqNo#isDeleted} indicates whether the returned document is soft(deleted) or live. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you want (soft)deleted |
||
* This returns {@code null} if no such document matching the given term uid. | ||
*/ | ||
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException { | ||
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); | ||
List<LeafReaderContext> leaves = reader.leaves(); | ||
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); | ||
final List<LeafReaderContext> leaves = reader.leaves(); | ||
DocIdAndSeqNo latest = null; | ||
// iterate backwards to optimize for the frequently updated documents | ||
// which are likely to be in the last segments | ||
for (int i = leaves.size() - 1; i >= 0; i--) { | ||
final LeafReaderContext leaf = leaves.get(i); | ||
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; | ||
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf); | ||
if (result != null) { | ||
return result; | ||
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; | ||
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf); | ||
if (result == null) { | ||
continue; | ||
} | ||
if (latest == null || latest.seqNo <= result.seqNo) { | ||
latest = result; | ||
} | ||
if (latest.isDeleted == false) { | ||
// The live document must always be the latest copy, thus we can early terminate here. | ||
break; | ||
} | ||
} | ||
return null; | ||
return latest; | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,6 +139,7 @@ | |
import java.util.Base64; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashMap; | ||
|
@@ -3681,6 +3682,72 @@ public void testSequenceIDs() throws Exception { | |
searchResult.close(); | ||
} | ||
|
||
public void testLookupSeqNoByIdInLucene() throws Exception { | ||
int numOps = between(10, 100); | ||
long seqNo = 0; | ||
List<Engine.Operation> operations = new ArrayList<>(numOps); | ||
Map<String, Long> history = new HashMap<>(); | ||
Set<String> liveIds = new HashSet<>(); | ||
for (int i = 0; i < numOps; i++) { | ||
String id = Integer.toString(between(1, 50)); | ||
boolean isIndexing = randomBoolean(); | ||
int copies = frequently() ? 1 : between(2, 4); | ||
for (int c = 0; c < copies; c++) { | ||
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null); | ||
if (isIndexing) { | ||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), | ||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); | ||
history.put(id, seqNo); | ||
liveIds.add(id); | ||
} else { | ||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), | ||
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); | ||
history.put(id, seqNo); | ||
liveIds.remove(id); | ||
} | ||
} | ||
seqNo++; | ||
if (rarely()) { | ||
seqNo++; | ||
} | ||
} | ||
Randomness.shuffle(operations); | ||
Settings.Builder settings = Settings.builder() | ||
.put(defaultSettings.getSettings()) | ||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); | ||
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); | ||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); | ||
try (Store store = createStore(); | ||
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) { | ||
for (Engine.Operation op : operations) { | ||
if (op instanceof Engine.Index) { | ||
engine.index((Engine.Index) op); | ||
} else if (op instanceof Engine.Delete) { | ||
engine.delete((Engine.Delete) op); | ||
} else { | ||
engine.noOp((Engine.NoOp) op); | ||
} | ||
if (randomInt(100) < 10) { | ||
engine.refresh("test"); | ||
} | ||
if (rarely()) { | ||
engine.flush(); | ||
} | ||
} | ||
engine.refresh("test"); | ||
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the test will be much stronger if you do this also after every refresh (while maintaining last known sec no per id in a separate map. |
||
for (String id : history.keySet()) { | ||
String msg = "history=" + history + " liveIds=" + liveIds + " id=" + id; | ||
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id)); | ||
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(history.get(id))); | ||
assertThat(msg, docIdAndSeqNo.isDeleted, equalTo(liveIds.contains(id) == false)); | ||
} | ||
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), newUid("not-" + between(1, 10))), nullValue()); | ||
} | ||
assertThat(getDocIds(engine, true).stream().map(d -> d.getId()).collect(Collectors.toSet()), equalTo(liveIds)); | ||
} | ||
} | ||
|
||
/** | ||
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the | ||
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of | ||
|
@@ -5164,6 +5231,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { | |
commits.add(new ArrayList<>()); | ||
try (Store store = createStore()) { | ||
EngineConfig config = config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); | ||
final List<DocIdSeqNoAndTerm> docs; | ||
try (InternalEngine engine = createEngine(config)) { | ||
List<Engine.Operation> flushedOperations = new ArrayList<>(); | ||
for (Engine.Operation op : operations) { | ||
|
@@ -5186,6 +5254,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { | |
} | ||
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); | ||
engine.syncTranslog(); | ||
docs = getDocIds(engine, true); | ||
} | ||
trimUnsafeCommits(config); | ||
List<Engine.Operation> safeCommit = null; | ||
|
@@ -5202,6 +5271,9 @@ public void testRebuildLocalCheckpointTracker() throws Exception { | |
assertThat("seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), | ||
tracker.contains(op.seqNo()), equalTo(safeCommit.contains(op))); | ||
} | ||
engine.initializeMaxSeqNoOfUpdatesOrDeletes(); | ||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); | ||
assertThat(getDocIds(engine, true), equalTo(docs)); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have an optimization (and maybe test it somehow) that when we hit a live doc we stop and return. This is important to stop iterating quickly when people are updating an existing doc (which is a comment pattern imo).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed lets return if we hit a live doc and otherwise use the highest seqId. then we don't have the extraguard below or it's implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++