Skip to content

Commit

Permalink
Decouple TimeValue from Elasticsearch server classes (#29454)
Browse files Browse the repository at this point in the history
* Decouple TimeValue from Elasticsearch server classes

This commit decouples the `TimeValue` class from the other server classes. This
is in preperation to move `TimeValue` into the `elasticsearch-core` jar,
allowing us to use it from projects that cannot depend on the elasticsearch-core
library.

Relates to #28504
  • Loading branch information
dakrone committed Apr 11, 2018
1 parent 0c95b2d commit 1690178
Show file tree
Hide file tree
Showing 27 changed files with 179 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void readFrom(StreamInput in) throws IOException {
indices[i] = in.readString();
}
}
timeout = new TimeValue(in);
timeout = in.readTimeValue();
if (in.readBoolean()) {
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
}
Expand All @@ -227,7 +227,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
}
}
timeout.writeTo(out);
out.writeTimeValue(timeout);
if (waitForStatus == null) {
out.writeBoolean(false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void readFrom(StreamInput in) throws IOException {
timedOut = in.readBoolean();
numberOfInFlightFetch = in.readInt();
delayedUnassignedShards= in.readInt();
taskMaxWaitingTime = new TimeValue(in);
taskMaxWaitingTime = in.readTimeValue();
}

@Override
Expand All @@ -197,7 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(timedOut);
out.writeInt(numberOfInFlightFetch);
out.writeInt(delayedUnassignedShards);
taskMaxWaitingTime.writeTo(out);
out.writeTimeValue(taskMaxWaitingTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void readFrom(StreamInput in) throws IOException {
threads = in.readInt();
ignoreIdleThreads = in.readBoolean();
type = in.readString();
interval = new TimeValue(in);
interval = in.readTimeValue();
snapshots = in.readInt();
}

Expand All @@ -109,7 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(threads);
out.writeBoolean(ignoreIdleThreads);
out.writeString(type);
interval.writeTo(out);
out.writeTimeValue(interval);
out.writeInt(snapshots);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
timeout = in.readOptionalWriteable(TimeValue::new);
timeout = in.readOptionalTimeValue();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
out.writeOptionalWriteable(timeout);
out.writeOptionalTimeValue(timeout);
out.writeBoolean(waitForCompletion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public void readFrom(StreamInput in) throws IOException {
requests.add(DocWriteRequest.readDocumentRequest(in));
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = new TimeValue(in);
timeout = in.readTimeValue();
}

@Override
Expand All @@ -605,7 +605,7 @@ public void writeTo(StreamOutput out) throws IOException {
DocWriteRequest.writeDocumentRequest(out, request);
}
refreshPolicy.writeTo(out);
timeout.writeTo(out);
out.writeTimeValue(timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public void readFrom(StreamInput in) throws IOException {
parent = in.readOptionalString();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readOptionalString(); // timestamp
in.readOptionalWriteable(TimeValue::new); // ttl
in.readOptionalTimeValue(); // ttl
}
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public TimeValue ackTimeout() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timeout = new TimeValue(in);
timeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
timeout.writeTo(out);
out.writeTimeValue(timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public final TimeValue masterNodeTimeout() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
masterNodeTimeout = new TimeValue(in);
masterNodeTimeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
masterNodeTimeout.writeTo(out);
out.writeTimeValue(masterNodeTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodesIds = in.readStringArray();
concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new);
timeout = in.readOptionalWriteable(TimeValue::new);
timeout = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
out.writeOptionalArray(concreteNodes);
out.writeOptionalWriteable(timeout);
out.writeOptionalTimeValue(timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void readFrom(StreamInput in) throws IOException {
shardId = null;
}
waitForActiveShards = ActiveShardCount.readFrom(in);
timeout = new TimeValue(in);
timeout = in.readTimeValue();
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
}
Expand All @@ -202,7 +202,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
}
waitForActiveShards.writeTo(out);
timeout.writeTo(out);
out.writeTimeValue(timeout);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void readFrom(StreamInput in) throws IOException {
} else {
shardId = null;
}
timeout = new TimeValue(in);
timeout = in.readTimeValue();
concreteIndex = in.readOptionalString();
}

Expand All @@ -127,7 +127,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeOptionalStreamable(shardId);
timeout.writeTo(out);
out.writeTimeValue(timeout);
out.writeOptionalString(concreteIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void readFrom(StreamInput in) throws IOException {
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalWriteable(TimeValue::new);
timeout = in.readOptionalTimeValue();
}

@Override
Expand All @@ -154,7 +154,7 @@ public void writeTo(StreamOutput out) throws IOException {
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodes);
out.writeStringArrayNullable(actions);
out.writeOptionalWriteable(timeout);
out.writeOptionalTimeValue(timeout);
}

public boolean match(Task task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import java.util.function.Supplier;

Expand All @@ -82,6 +84,26 @@
* on {@link StreamInput}.
*/
public abstract class StreamInput extends InputStream {

private static final Map<Byte, TimeUnit> BYTE_TIME_UNIT_MAP;

static {
final Map<Byte, TimeUnit> byteTimeUnitMap = new HashMap<>();
byteTimeUnitMap.put((byte)0, TimeUnit.NANOSECONDS);
byteTimeUnitMap.put((byte)1, TimeUnit.MICROSECONDS);
byteTimeUnitMap.put((byte)2, TimeUnit.MILLISECONDS);
byteTimeUnitMap.put((byte)3, TimeUnit.SECONDS);
byteTimeUnitMap.put((byte)4, TimeUnit.MINUTES);
byteTimeUnitMap.put((byte)5, TimeUnit.HOURS);
byteTimeUnitMap.put((byte)6, TimeUnit.DAYS);

for (TimeUnit value : TimeUnit.values()) {
assert byteTimeUnitMap.containsValue(value) : value;
}

BYTE_TIME_UNIT_MAP = Collections.unmodifiableMap(byteTimeUnitMap);
}

private Version version = Version.CURRENT;

/**
Expand Down Expand Up @@ -976,4 +998,24 @@ private int readArraySize() throws IOException {
* be a no-op depending on the underlying implementation if the information of the remaining bytes is not present.
*/
protected abstract void ensureCanReadBytes(int length) throws EOFException;

/**
* Read a {@link TimeValue} from the stream
*/
public TimeValue readTimeValue() throws IOException {
long duration = readZLong();
TimeUnit timeUnit = BYTE_TIME_UNIT_MAP.get(readByte());
return new TimeValue(duration, timeUnit);
}

/**
* Read an optional {@link TimeValue} from the stream, returning null if no TimeValue was written.
*/
public @Nullable TimeValue readOptionalTimeValue() throws IOException {
if (readBoolean()) {
return readTimeValue();
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
Expand All @@ -54,11 +55,13 @@
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

/**
Expand All @@ -74,6 +77,25 @@
*/
public abstract class StreamOutput extends OutputStream {

private static final Map<TimeUnit, Byte> TIME_UNIT_BYTE_MAP;

static {
final Map<TimeUnit, Byte> timeUnitByteMap = new EnumMap<>(TimeUnit.class);
timeUnitByteMap.put(TimeUnit.NANOSECONDS, (byte)0);
timeUnitByteMap.put(TimeUnit.MICROSECONDS, (byte)1);
timeUnitByteMap.put(TimeUnit.MILLISECONDS, (byte)2);
timeUnitByteMap.put(TimeUnit.SECONDS, (byte)3);
timeUnitByteMap.put(TimeUnit.MINUTES, (byte)4);
timeUnitByteMap.put(TimeUnit.HOURS, (byte)5);
timeUnitByteMap.put(TimeUnit.DAYS, (byte)6);

for (TimeUnit value : TimeUnit.values()) {
assert timeUnitByteMap.containsKey(value) : value;
}

TIME_UNIT_BYTE_MAP = Collections.unmodifiableMap(timeUnitByteMap);
}

private Version version = Version.CURRENT;

/**
Expand Down Expand Up @@ -987,4 +1009,24 @@ public <E extends Enum<E>> void writeEnum(E enumValue) throws IOException {
writeVInt(enumValue.ordinal());
}

/**
* Write a {@link TimeValue} to the stream
*/
public void writeTimeValue(TimeValue timeValue) throws IOException {
writeZLong(timeValue.duration());
writeByte(TIME_UNIT_BYTE_MAP.get(timeValue.timeUnit()));
}

/**
* Write an optional {@link TimeValue} to the stream.
*/
public void writeOptionalTimeValue(@Nullable TimeValue timeValue) throws IOException {
if (timeValue == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeTimeValue(timeValue);
}
}

}
Loading

0 comments on commit 1690178

Please sign in to comment.