Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Streamable to Writeable for recovery package #37380

Merged
merged 9 commits into from
Jan 25, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public RecoveryRequest() {
this(Strings.EMPTY_ARRAY);
}

public RecoveryRequest(StreamInput in) throws IOException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
super(in);
detailed = in.readBoolean();
activeOnly = in.readBoolean();
}

/**
* Constructs a request for recovery information for all shards for the given indices
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards,

@Override
protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
final RecoveryRequest recoveryRequest = new RecoveryRequest();
recoveryRequest.readFrom(in);
return recoveryRequest;
return new RecoveryRequest(in);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
public BroadcastRequest() {
}

public BroadcastRequest(StreamInput in) throws IOException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

protected BroadcastRequest(String[] indices) {
this.indices = indices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
Expand All @@ -46,7 +47,7 @@
/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContentFragment, Streamable {
public class RecoveryState implements ToXContentFragment, Streamable, Writeable {

public enum Stage {
INIT((byte) 0),
Expand Down Expand Up @@ -102,19 +103,29 @@ public static Stage fromId(byte id) {

private Stage stage;

private final Index index = new Index();
private final Translog translog = new Translog();
private final VerifyIndex verifyIndex = new VerifyIndex();
private final Timer timer = new Timer();
private final Index index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final Timer timer;

private RecoverySource recoverySource;
private ShardId shardId;
@Nullable
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private boolean primary = false;
private boolean primary;

private RecoveryState() {
public RecoveryState(StreamInput in) throws IOException {
timer = new Timer(in);
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);
index = new Index(in);
translog = new Translog(in);
verifyIndex = new VerifyIndex(in);
primary = in.readBoolean();
}

public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
Expand All @@ -128,6 +139,10 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
this.sourceNode = sourceNode;
this.targetNode = targetNode;
stage = Stage.INIT;
index = new Index();
translog = new Translog();
verifyIndex = new VerifyIndex();
timer = new Timer();
timer.start();
}

Expand Down Expand Up @@ -223,23 +238,12 @@ public boolean getPrimary() {
}

public static RecoveryState readRecoveryState(StreamInput in) throws IOException {
RecoveryState recoveryState = new RecoveryState();
recoveryState.readFrom(in);
return recoveryState;
return new RecoveryState(in);
}

@Override
public synchronized void readFrom(StreamInput in) throws IOException {
timer.readFrom(in);
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);
index.readFrom(in);
translog.readFrom(in);
verifyIndex.readFrom(in);
primary = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down Expand Up @@ -347,12 +351,22 @@ static final class Fields {
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}

public static class Timer implements Streamable {
public static class Timer implements Streamable, Writeable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
protected long startTime = 0;
protected long startNanoTime = 0;
protected long time = -1;
protected long stopTime = 0;

public Timer() {
}

public Timer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}

public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -394,13 +408,9 @@ public synchronized void reset() {
stopTime = 0;
}


@Override
public synchronized void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand All @@ -414,9 +424,16 @@ public synchronized void writeTo(StreamOutput out) throws IOException {

}

public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable {
public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable, Writeable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
private volatile long checkIndexTime;

public VerifyIndex() {
}

public VerifyIndex(StreamInput in) throws IOException {
super(in);
checkIndexTime = in.readVLong();
}

public void reset() {
super.reset();
Expand All @@ -433,8 +450,7 @@ public void checkIndexTime(long checkIndexTime) {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
checkIndexTime = in.readVLong();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand All @@ -451,13 +467,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}

public static class Translog extends Timer implements ToXContentFragment, Streamable {
public static class Translog extends Timer implements ToXContentFragment, Streamable, Writeable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
public static final int UNKNOWN = -1;

private int recovered;
private int total = UNKNOWN;
private int totalOnStart = UNKNOWN;

public Translog() {
}

public Translog(StreamInput in) throws IOException {
super(in);
recovered = in.readVInt();
total = in.readVInt();
totalOnStart = in.readVInt();
}

public synchronized void reset() {
super.reset();
recovered = 0;
Expand Down Expand Up @@ -535,10 +561,7 @@ public synchronized float recoveredPercent() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recovered = in.readVInt();
total = in.readVInt();
totalOnStart = in.readVInt();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand All @@ -560,7 +583,7 @@ public synchronized XContentBuilder toXContent(XContentBuilder builder, Params p
}
}

public static class File implements ToXContentObject, Streamable {
public static class File implements ToXContentObject, Streamable, Writeable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
private String name;
private long length;
private long recovered;
Expand All @@ -576,6 +599,13 @@ public File(String name, long length, boolean reused) {
this.reused = reused;
}

public File(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}

void addRecoveredBytes(long bytes) {
assert reused == false : "file is marked as reused, can't update recovered bytes";
assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]";
Expand Down Expand Up @@ -614,18 +644,9 @@ boolean fullyRecovered() {
return reused == false && length == recovered;
}

public static File readFile(StreamInput in) throws IOException {
File file = new File();
file.readFrom(in);
return file;
}

@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down Expand Up @@ -671,7 +692,7 @@ public String toString() {
}
}

public static class Index extends Timer implements ToXContentFragment, Streamable {
public static class Index extends Timer implements ToXContentFragment, Streamable, Writeable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved

private Map<String, File> fileDetails = new HashMap<>();

Expand All @@ -681,6 +702,20 @@ public static class Index extends Timer implements ToXContentFragment, Streamabl
private long sourceThrottlingInNanos = UNKNOWN;
private long targetThrottleTimeInNanos = UNKNOWN;

public Index() {
}

public Index(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = new File(in);
fileDetails.put(file.name, file);
}
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}

public synchronized List<File> fileDetails() {
return Collections.unmodifiableList(new ArrayList<>(fileDetails.values()));
}
Expand Down Expand Up @@ -885,14 +920,7 @@ public synchronized void updateVersion(long version) {

@Override
public synchronized void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = File.readFile(in);
fileDetails.put(file.name, file);
}
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Loading