Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

type and id are lost upon serialization of Translog.Delete. #24586

Merged
merged 2 commits into from
Jun 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1102,8 +1102,8 @@ public static class Delete extends Operation {
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this.type = type;
this.id = id;
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
}

public Delete(String type, String id, Term uid) {
Expand Down
51 changes: 37 additions & 14 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
Expand All @@ -58,6 +59,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -919,8 +921,8 @@ public static class Index implements Operation {
private final String id;
private final long autoGeneratedIdTimestamp;
private final String type;
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long primaryTerm = 0;
private final long seqNo;
private final long primaryTerm;
private final long version;
private final VersionType versionType;
private final BytesReference source;
Expand Down Expand Up @@ -950,6 +952,9 @@ public Index(StreamInput in) throws IOException {
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
primaryTerm = 0;
}
}

Expand All @@ -976,6 +981,7 @@ public Index(String type, String id, long seqNo, long version, VersionType versi
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
this.primaryTerm = 0;
this.version = version;
this.versionType = versionType;
this.routing = routing;
Expand Down Expand Up @@ -1113,27 +1119,42 @@ public long getAutoGeneratedIdTimestamp() {

public static class Delete implements Operation {

private static final int FORMAT_5_X = 2;
private static final int FORMAT_SEQ_NO = FORMAT_5_X + 1;
public static final int FORMAT_5_0 = 2; // 5.0 - 5.5
private static final int FORMAT_SINGLE_TYPE = FORMAT_5_0 + 1; // 5.5 - 6.0
private static final int FORMAT_SEQ_NO = FORMAT_SINGLE_TYPE + 1; // 6.0 - *
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;

private String type, id;
private Term uid;
private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long primaryTerm = 0;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private final String type, id;
private final Term uid;
private final long seqNo;
private final long primaryTerm;
private final long version;
private final VersionType versionType;

public Delete(StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_5_X : "format was: " + format;
uid = new Term(in.readString(), in.readString());
assert format >= FORMAT_5_0 : "format was: " + format;
if (format >= FORMAT_SINGLE_TYPE) {
type = in.readString();
id = in.readString();
uid = new Term(in.readString(), in.readString());
} else {
uid = new Term(in.readString(), in.readString());
// the uid was constructed from the type and id so we can
// extract them back
Uid uidObject = Uid.createUid(uid.text());
type = uidObject.type();
id = uidObject.id();
}
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
primaryTerm = 0;
}
}

Expand All @@ -1147,8 +1168,8 @@ public Delete(String type, String id, long seqNo, Term uid) {
}

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
this.type = type;
this.id = id;
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
Expand Down Expand Up @@ -1204,6 +1225,8 @@ public Source getSource() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(type);
out.writeString(id);
out.writeString(uid.field());
out.writeString(uid.text());
out.writeLong(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ public void testBasicCreatedFlag() throws IOException {
indexResult = engine.index(index);
assertFalse(indexResult.isCreated());

engine.delete(new Engine.Delete(null, "1", newUid(doc)));
engine.delete(new Engine.Delete("doc", "1", newUid(doc)));

index = indexForDoc(doc);
indexResult = engine.index(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,24 +354,24 @@ public void testStats() throws IOException {
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(139L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(146L));
}

translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(181L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
}

translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(223L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(237L));
}

final long expectedSizeInBytes = 266L;
final long expectedSizeInBytes = 280L;
translog.rollGeneration();
{
final TranslogStats stats = stats();
Expand Down Expand Up @@ -2263,6 +2263,20 @@ public void testTranslogOpSerialization() throws Exception {
in = out.bytes().streamInput();
Translog.Delete serializedDelete = new Translog.Delete(in);
assertEquals(delete, serializedDelete);

// simulate legacy delete serialization
out = new BytesStreamOutput();
out.writeVInt(Translog.Delete.FORMAT_5_0);
out.writeString(UidFieldMapper.NAME);
out.writeString("my_type#my_id");
out.writeLong(3); // version
out.writeByte(VersionType.INTERNAL.getValue());
out.writeLong(2); // seq no
out.writeLong(0); // primary term
in = out.bytes().streamInput();
serializedDelete = new Translog.Delete(in);
assertEquals("my_type", serializedDelete.type());
assertEquals("my_id", serializedDelete.id());
}

public void testRollGeneration() throws IOException {
Expand Down