Skip to content

Commit

Permalink
#1903: supplemental change (#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
beiwei30 authored Oct 21, 2018
1 parent b723837 commit c805c1d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = deserialize(channel, is, proto);
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
Expand Down Expand Up @@ -179,7 +179,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = deserialize(channel, is, proto);
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -198,11 +198,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
return s.deserialize(channel.getUrl(), is);
}

protected Object getRequestData(long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.Serialization;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -75,4 +77,8 @@ public static Serialization getSerialization(URL url, Byte id) throws IOExceptio
return serialization;
}

public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = deserialize(channel, is, proto);
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
Expand Down Expand Up @@ -170,7 +170,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = deserialize(channel, is, proto);
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -189,11 +189,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
return s.deserialize(channel.getUrl(), is);
}

protected Object getRequestData(long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package org.apache.dubbo.rpc.protocol.dubbo;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.common.io.UnsafeByteArrayInputStream;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
Expand Down Expand Up @@ -64,7 +62,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -76,13 +73,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
data = decodeEventData(channel, in);
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Expand All @@ -99,15 +97,15 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = result;
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -120,10 +118,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
try {
Object data;
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Expand All @@ -150,11 +149,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
throws IOException {
return serialization.deserialize(url, is);
}

private byte[] readMessageData(InputStream is) throws IOException {
if (is.available() > 0) {
byte[] result = new byte[is.available()];
Expand Down

0 comments on commit c805c1d

Please sign in to comment.