Skip to content

Commit

Permalink
Add raw sort values to SearchSortValues transport serialization
Browse files Browse the repository at this point in the history
In order for CCS alternate execution mode (see elastic#32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one.

This commit adds such information to the SearchSortValues and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
  • Loading branch information
javanna committed Dec 14, 2018
1 parent bb3ae18 commit 10c7b61
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 112 deletions.
18 changes: 18 additions & 0 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,17 @@ public static FieldDoc readFieldDoc(StreamInput in) throws IOException {
return new FieldDoc(in.readVInt(), in.readFloat(), cFields);
}

public static Object[] readSortValues(StreamInput in) throws IOException {
int size = in.readVInt();
Object[] values = new Object[size];
if (size > 0) {
for (int i = 0; i < size; i++) {
values[i] = readSortValue(in);
}
}
return values;
}

public static Comparable readSortValue(StreamInput in) throws IOException {
byte type = in.readByte();
if (type == 0) {
Expand Down Expand Up @@ -489,6 +500,13 @@ private static Object readMissingValue(StreamInput in) throws IOException {
}
}

public static void writeSortValues(StreamOutput out, Object[] values) throws IOException {
out.writeVInt(values.length);
for (Object value : values) {
writeSortValue(out, value);
}
}

public static void writeSortValue(StreamOutput out, Object field) throws IOException {
if (field == null) {
out.writeByte((byte) 0);
Expand Down
33 changes: 20 additions & 13 deletions server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@

package org.elasticsearch.search;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.OriginalIndices;
Expand Down Expand Up @@ -61,6 +51,16 @@
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -306,15 +306,22 @@ public void sortValues(Object[] sortValues, DocValueFormat[] sortValueFormats) {
sortValues(new SearchSortValues(sortValues, sortValueFormats));
}

public void sortValues(SearchSortValues sortValues) {
void sortValues(SearchSortValues sortValues) {
this.sortValues = sortValues;
}

/**
* An array of the sort values used.
* An array of the (formatted) sort values used.
*/
public Object[] getSortValues() {
return sortValues.sortValues();
return sortValues.getFormattedSortValues();
}

/**
* An array of the (raw) sort values used.
*/
public Object[] getRawSortValues() {
return sortValues.getRawSortValues();
}

/**
Expand Down
134 changes: 52 additions & 82 deletions server/src/main/java/org/elasticsearch/search/SearchSortValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.elasticsearch.search;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -35,101 +37,56 @@

public class SearchSortValues implements ToXContentFragment, Writeable {

static final SearchSortValues EMPTY = new SearchSortValues(new Object[0]);
private final Object[] sortValues;
private static final Object[] EMPTY_ARRAY = new Object[0];
static final SearchSortValues EMPTY = new SearchSortValues(EMPTY_ARRAY);

private final Object[] formattedSortValues;
private final Object[] rawSortValues;

SearchSortValues(Object[] sortValues) {
this.sortValues = Objects.requireNonNull(sortValues, "sort values must not be empty");
this.formattedSortValues = Objects.requireNonNull(sortValues, "sort values must not be empty");
this.rawSortValues = EMPTY_ARRAY;
}

public SearchSortValues(Object[] sortValues, DocValueFormat[] sortValueFormats) {
Objects.requireNonNull(sortValues);
SearchSortValues(Object[] rawSortValues, DocValueFormat[] sortValueFormats) {
Objects.requireNonNull(rawSortValues);
Objects.requireNonNull(sortValueFormats);
this.sortValues = Arrays.copyOf(sortValues, sortValues.length);
for (int i = 0; i < sortValues.length; ++i) {
if (this.sortValues[i] instanceof BytesRef) {
this.sortValues[i] = sortValueFormats[i].format((BytesRef) sortValues[i]);
if (rawSortValues.length != sortValueFormats.length) {
throw new IllegalArgumentException("formattedSortValues and sortValueFormats must hold the same number of items");
}
this.rawSortValues = rawSortValues;
this.formattedSortValues = Arrays.copyOf(rawSortValues, rawSortValues.length);
for (int i = 0; i < rawSortValues.length; ++i) {
//we currently format only BytesRef but we may want to change that in the future
Object sortValue = rawSortValues[i];
if (sortValue instanceof BytesRef) {
this.formattedSortValues[i] = sortValueFormats[i].format((BytesRef) sortValue);
}
}
}

public SearchSortValues(StreamInput in) throws IOException {
int size = in.readVInt();
if (size > 0) {
sortValues = new Object[size];
for (int i = 0; i < sortValues.length; i++) {
byte type = in.readByte();
if (type == 0) {
sortValues[i] = null;
} else if (type == 1) {
sortValues[i] = in.readString();
} else if (type == 2) {
sortValues[i] = in.readInt();
} else if (type == 3) {
sortValues[i] = in.readLong();
} else if (type == 4) {
sortValues[i] = in.readFloat();
} else if (type == 5) {
sortValues[i] = in.readDouble();
} else if (type == 6) {
sortValues[i] = in.readByte();
} else if (type == 7) {
sortValues[i] = in.readShort();
} else if (type == 8) {
sortValues[i] = in.readBoolean();
} else {
throw new IOException("Can't match type [" + type + "]");
}
}
SearchSortValues(StreamInput in) throws IOException {
this.formattedSortValues = Lucene.readSortValues(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
this.rawSortValues = Lucene.readSortValues(in);
} else {
sortValues = new Object[0];
this.rawSortValues = EMPTY_ARRAY;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(sortValues.length);
for (Object sortValue : sortValues) {
if (sortValue == null) {
out.writeByte((byte) 0);
} else {
Class type = sortValue.getClass();
if (type == String.class) {
out.writeByte((byte) 1);
out.writeString((String) sortValue);
} else if (type == Integer.class) {
out.writeByte((byte) 2);
out.writeInt((Integer) sortValue);
} else if (type == Long.class) {
out.writeByte((byte) 3);
out.writeLong((Long) sortValue);
} else if (type == Float.class) {
out.writeByte((byte) 4);
out.writeFloat((Float) sortValue);
} else if (type == Double.class) {
out.writeByte((byte) 5);
out.writeDouble((Double) sortValue);
} else if (type == Byte.class) {
out.writeByte((byte) 6);
out.writeByte((Byte) sortValue);
} else if (type == Short.class) {
out.writeByte((byte) 7);
out.writeShort((Short) sortValue);
} else if (type == Boolean.class) {
out.writeByte((byte) 8);
out.writeBoolean((Boolean) sortValue);
} else {
throw new IOException("Can't handle sort field value of type [" + type + "]");
}
}
Lucene.writeSortValues(out, this.formattedSortValues);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
Lucene.writeSortValues(out, this.rawSortValues);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (sortValues.length > 0) {
if (formattedSortValues.length > 0) {
builder.startArray(Fields.SORT);
for (Object sortValue : sortValues) {
for (Object sortValue : formattedSortValues) {
builder.value(sortValue);
}
builder.endArray();
Expand All @@ -142,24 +99,37 @@ public static SearchSortValues fromXContent(XContentParser parser) throws IOExce
return new SearchSortValues(parser.list().toArray());
}

public Object[] sortValues() {
return sortValues;
/**
* Returns the formatted version of the values that sorting was performed against
*/
public Object[] getFormattedSortValues() {
return formattedSortValues;
}

/**
* Returns the raw version of the values that sorting was performed against
*/
public Object[] getRawSortValues() {
return rawSortValues;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SearchSortValues other = (SearchSortValues) obj;
return Arrays.equals(sortValues, other.sortValues);
SearchSortValues that = (SearchSortValues) o;
return Arrays.equals(formattedSortValues, that.formattedSortValues) &&
Arrays.equals(rawSortValues, that.rawSortValues);
}

@Override
public int hashCode() {
return Arrays.hashCode(sortValues);
int result = Arrays.hashCode(formattedSortValues);
result = 31 * result + Arrays.hashCode(rawSortValues);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.LuceneTests;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -31,23 +32,36 @@
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.RandomObjects;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;

public class SearchSortValuesTests extends AbstractSerializingTestCase<SearchSortValues> {

public static SearchSortValues createTestItem(XContentType xContentType, boolean transportSerialization) {
int size = randomIntBetween(1, 20);
Object[] values = new Object[size];
DocValueFormat[] sortValueFormats = new DocValueFormat[size];
for (int i = 0; i < size; i++) {
Object sortValue = randomSortValue(xContentType, transportSerialization);
values[i] = sortValue;
//make sure that for BytesRef, we provide a specific doc value format that overrides format(BytesRef)
sortValueFormats[i] = sortValue instanceof BytesRef ? DocValueFormat.RAW : randomDocValueFormat();
if (transportSerialization) {
DocValueFormat[] sortValueFormats = new DocValueFormat[size];
for (int i = 0; i < size; i++) {
Object sortValue = randomSortValue(xContentType, transportSerialization);
values[i] = sortValue;
//make sure that for BytesRef, we provide a specific doc value format that overrides format(BytesRef)
sortValueFormats[i] = sortValue instanceof BytesRef ? DocValueFormat.RAW : randomDocValueFormat();
}
return new SearchSortValues(values, sortValueFormats);
} else {
//xcontent serialization doesn't write/parse the raw sort values, only the formatted ones
for (int i = 0; i < size; i++) {
Object sortValue = randomSortValue(xContentType, transportSerialization);
//make sure that BytesRef are not provided as formatted values
sortValue = sortValue instanceof BytesRef ? DocValueFormat.RAW.format((BytesRef)sortValue) : sortValue;
values[i] = sortValue;
}
return new SearchSortValues(values);
}
return new SearchSortValues(values, sortValueFormats);
}

private static Object randomSortValue(XContentType xContentType, boolean transportSerialization) {
Expand Down Expand Up @@ -79,7 +93,7 @@ protected SearchSortValues createXContextTestInstance(XContentType xContentType)

@Override
protected SearchSortValues createTestInstance() {
return createTestItem(randomFrom(XContentType.values()), true);
return createTestItem(randomFrom(XContentType.values()), randomBoolean());
}

@Override
Expand Down Expand Up @@ -113,20 +127,32 @@ public void testToXContent() throws IOException {

@Override
protected SearchSortValues mutateInstance(SearchSortValues instance) {
Object[] sortValues = instance.sortValues();
if (sortValues.length == 0) {
return createTestInstance();
}
Object[] sortValues = instance.getFormattedSortValues();
if (randomBoolean()) {
return new SearchSortValues(new Object[0]);
}
Object[] values = Arrays.copyOf(sortValues, sortValues.length + 1);
values[sortValues.length] = randomSortValue(randomFrom(XContentType.values()), true);
values[sortValues.length] = randomSortValue(randomFrom(XContentType.values()), randomBoolean());
return new SearchSortValues(values);
}

@Override
protected SearchSortValues copyInstance(SearchSortValues instance, Version version) {
return new SearchSortValues(Arrays.copyOf(instance.sortValues(), instance.sortValues().length));
//TODO rename and update version after backport
public void testSerializationPre70() throws IOException {
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0));
SearchSortValues original = createTestInstance();
SearchSortValues deserialized = copyInstance(original, version);
assertArrayEquals(original.getFormattedSortValues(), deserialized.getFormattedSortValues());
assertEquals(0, deserialized.getRawSortValues().length);
}

//TODO rename method and adapt versions after backport
public void testReadFromPre70() throws IOException {
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode("AwIAAAABAQEyBUAIAAAAAAAAAAAAAAAA"))) {
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
SearchSortValues deserialized = new SearchSortValues(in);
SearchSortValues expected = new SearchSortValues(new Object[]{1, "2", 3d});
assertEquals(expected, deserialized);
assertEquals(0, deserialized.getRawSortValues().length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public abstract class AbstractWireSerializingTestCase<T extends Writeable> exten

@Override
protected T copyInstance(T instance, Version version) throws IOException {
return copyWriteable(instance, getNamedWriteableRegistry(), instanceReader());
return copyWriteable(instance, getNamedWriteableRegistry(), instanceReader(), version);
}
}

0 comments on commit 10c7b61

Please sign in to comment.