diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 758363117f..6b41d5e7d7 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -89,11 +89,17 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) - if !ok { - return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + if p.Header.Type&hessian.PackageRequest != 0x00 { + // size of this array must be '7' + // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 + p.Body = make([]interface{}, 7) + } else { + pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) + if !ok { + return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) + } + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} } // read body diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index df9ab28e0e..2e4b3999df 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -85,11 +85,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { } if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - if p.Err != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + if p.Header.Type&hessian.PackageResponse != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) + } + h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) + } else { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) + p.Header.ResponseStatus = hessian.Response_OK + reply(session, p, hessian.PackageHeartbeat) } - h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) return } logger.Debugf("get rpc response{header: %#v, body: %#v}", p.Header, p.Body) @@ -199,7 +205,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { // heartbeat if p.Header.Type&hessian.PackageHeartbeat != 0x00 { logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - h.reply(session, p, hessian.PackageHeartbeat) + reply(session, p, hessian.PackageHeartbeat) return } @@ -226,7 +232,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } }() @@ -241,7 +247,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Errorf(err.Error()) p.Header.ResponseStatus = hessian.Response_OK p.Body = err - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) return } invoker := exporter.(protocol.Exporter).GetInvoker() @@ -266,7 +272,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { if !twoway { return } - h.reply(session, p, hessian.PackageResponse) + reply(session, p, hessian.PackageResponse) } func (h *RpcServerHandler) OnCron(session getty.Session) { @@ -294,7 +300,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } -func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { +func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ SerialID: req.Header.SerialID, diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 930382cca8..e9dff1cfc7 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -62,8 +62,10 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface return nil, 0, perrors.WithStack(err) } - pkg.Err = pkg.Body.(*hessian.Response).Exception - pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + if pkg.Header.Type&hessian.PackageRequest == 0x00 { + pkg.Err = pkg.Body.(*hessian.Response).Exception + pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) + } return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil }