Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Streamable to Writeable for WatchStatus #37390

Merged
merged 2 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public WatchStatus getStatus() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
status = in.readBoolean() ? WatchStatus.read(in) : null;
status = in.readBoolean() ? new WatchStatus(in) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void readFrom(StreamInput in) throws IOException {
id = in.readString();
found = in.readBoolean();
if (found) {
status = WatchStatus.read(in);
status = new WatchStatus(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
seqNo = in.readZLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -36,7 +37,7 @@
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeDate;
import static org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils.writeOptionalDate;

public class WatchStatus implements ToXContentObject, Streamable {
public class WatchStatus implements ToXContentObject, Streamable, Writeable {

public static final String INCLUDE_STATE = "include_state";

Expand All @@ -49,8 +50,26 @@ public class WatchStatus implements ToXContentObject, Streamable {
@Nullable private Map<String, String> headers;
private Map<String, ActionStatus> actions;

// for serialization
private WatchStatus() {
public WatchStatus(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public WatchStatus(ZonedDateTime now, Map<String, ActionStatus> actions) {
Expand Down Expand Up @@ -222,31 +241,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), Instant.ofEpochMilli(in.readLong()).atZone(ZoneOffset.UTC));
boolean executionStateExists = in.readBoolean();
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
headers = Collections.emptyMap();
}
}

public static WatchStatus read(StreamInput in) throws IOException {
WatchStatus status = new WatchStatus();
status.readFrom(in);
return status;
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testHeadersSerialization() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
BytesReference bytesReference = out.bytes();
WatchStatus readStatus = WatchStatus.read(bytesReference.streamInput());
WatchStatus readStatus = new WatchStatus(bytesReference.streamInput());
assertThat(readStatus, is(status));
assertThat(readStatus.getHeaders(), is(headers));

Expand Down