diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java index e6d8b07dbd4..3c8d6dc29d6 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -151,7 +151,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro byte status = header[3]; res.setStatus(status); try { - ObjectInput in = deserialize(channel, is, proto); + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (status == Response.OK) { Object data; if (res.isHeartbeat()) { @@ -179,7 +179,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro req.setEvent(Request.HEARTBEAT_EVENT); } try { - ObjectInput in = deserialize(channel, is, proto); + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); @@ -198,11 +198,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } } - private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException { - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - return s.deserialize(channel.getUrl(), is); - } - protected Object getRequestData(long id) { DefaultFuture future = DefaultFuture.getFuture(id); if (future == null) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java index 9ca1c2e0542..07da9a38823 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java @@ -22,9 +22,11 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.serialize.ObjectInput; import org.apache.dubbo.common.serialize.Serialization; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -75,4 +77,8 @@ public static Serialization getSerialization(URL url, Byte id) throws IOExceptio return serialization; } + public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException { + Serialization s = getSerialization(url, proto); + return s.deserialize(url, is); + } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java index 41b6cc7b027..1dce90c8158 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java @@ -142,7 +142,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro byte status = header[3]; res.setStatus(status); try { - ObjectInput in = deserialize(channel, is, proto); + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (status == Response.OK) { Object data; if (res.isHeartbeat()) { @@ -170,7 +170,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro req.setEvent(Request.HEARTBEAT_EVENT); } try { - ObjectInput in = deserialize(channel, is, proto); + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); @@ -189,11 +189,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } } - private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException { - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - return s.deserialize(channel.getUrl(), is); - } - protected Object getRequestData(long id) { DefaultFuture future = DefaultFuture.getFuture(id); if (future == null) diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java index 7523392c1e7..070085038e2 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -17,7 +17,6 @@ package org.apache.dubbo.rpc.protocol.dubbo; import org.apache.dubbo.common.Constants; -import org.apache.dubbo.common.URL; import org.apache.dubbo.common.Version; import org.apache.dubbo.common.io.Bytes; import org.apache.dubbo.common.io.UnsafeByteArrayInputStream; @@ -25,7 +24,6 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.serialize.ObjectInput; import org.apache.dubbo.common.serialize.ObjectOutput; -import org.apache.dubbo.common.serialize.Serialization; import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; @@ -64,7 +62,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { @@ -76,13 +73,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro // get status. byte status = header[3]; res.setStatus(status); - if (status == Response.OK) { - try { + try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); + if (status == Response.OK) { Object data; if (res.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeEventData(channel, in); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter( @@ -99,15 +97,15 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro data = result; } res.setResult(data); - } catch (Throwable t) { - if (log.isWarnEnabled()) { - log.warn("Decode response failed: " + t.getMessage(), t); - } - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); + } else { + res.setErrorMessage(in.readUTF()); } - } else { - res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); + } catch (Throwable t) { + if (log.isWarnEnabled()) { + log.warn("Decode response failed: " + t.getMessage(), t); + } + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); } return res; } else { @@ -120,10 +118,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } try { Object data; + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (req.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( @@ -150,11 +149,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } } - private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) - throws IOException { - return serialization.deserialize(url, is); - } - private byte[] readMessageData(InputStream is) throws IOException { if (is.available() > 0) { byte[] result = new byte[is.available()];