Skip to content

Commit

Permalink
Migrate Streamable to Writeable for WatchStatus (#37390)
Browse files Browse the repository at this point in the history
  • Loading branch information
Like authored and jdconrad committed Mar 27, 2019
1 parent 22e688a commit d57b7a2
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void readFrom(StreamInput in) throws IOException {
id = in.readString();
found = in.readBoolean();
if (found) {
status = WatchStatus.read(in);
status = new WatchStatus(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
seqNo = in.readZLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -36,7 +37,7 @@
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeDate;
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeOptionalDate;

public class WatchStatus implements ToXContentObject, Streamable {
public class WatchStatus implements ToXContentObject, Streamable, Writeable {

public static final String INCLUDE_STATE = "include_state";

Expand All @@ -49,8 +50,26 @@ public class WatchStatus implements ToXContentObject, Streamable {
@Nullable private Map<String, String> headers;
private Map<String, ActionStatus> actions;

// for serialization
private WatchStatus() {
public WatchStatus(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public WatchStatus(ZonedDateTime now, Map<String, ActionStatus> actions) {
Expand Down Expand Up @@ -222,31 +241,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public static WatchStatus read(StreamInput in) throws IOException {
WatchStatus status = new WatchStatus();
status.readFrom(in);
return status;
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testHeadersSerialization() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
BytesReference bytesReference = out.bytes();
WatchStatus readStatus = WatchStatus.read(bytesReference.streamInput());
WatchStatus readStatus = new WatchStatus(bytesReference.streamInput());
assertThat(readStatus, is(status));
assertThat(readStatus.getHeaders(), is(headers));

Expand Down

0 comments on commit d57b7a2

Please sign in to comment.