From d0f16d20762b281a08a37fc90e9fdf928f5ad7d3 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 2 Dec 2019 18:50:06 +0800 Subject: [PATCH 01/11] add async callback --- common/proxy/proxy_factory.go | 2 +- common/proxy/proxy_factory/default.go | 4 ++-- common/proxy/proxy_factory/default_test.go | 2 +- common/rpc_service.go | 21 +++++++++++++++++++++ config/reference_config.go | 8 +++++--- config/reference_config_test.go | 1 + config/service.go | 8 ++++++++ protocol/dubbo/client.go | 9 +++------ protocol/dubbo/codec.go | 5 +++-- protocol/dubbo/dubbo_invoker.go | 2 +- 10 files changed, 46 insertions(+), 16 deletions(-) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 2567e0ee09..cd6ef3d1db 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -23,7 +23,7 @@ import ( ) type ProxyFactory interface { - GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index bafba60b40..bf87e6380d 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -54,11 +54,11 @@ type DefaultProxyFactory struct { func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } -func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - return proxy.NewProxy(invoker, nil, attachments) + return proxy.NewProxy(invoker, callBack, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index b6a6b675ba..cba7757e4b 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -33,7 +33,7 @@ import ( func Test_GetProxy(t *testing.T) { proxyFactory := NewDefaultProxyFactory() url := common.NewURLWithOptions() - proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url) assert.NotNil(t, proxy) } diff --git a/common/rpc_service.go b/common/rpc_service.go index 4741a6fa3c..a642ea0ae9 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -22,6 +22,7 @@ import ( "reflect" "strings" "sync" + "time" "unicode" "unicode/utf8" ) @@ -39,6 +40,26 @@ type RPCService interface { Reference() string // rpc service id or reference id } +// callback interface for async +type AsyncCallbackService interface { + CallBack(response CallResponse) // callback +} + +type Options struct { + // connect timeout + ConnectTimeout time.Duration + // request timeout + RequestTimeout time.Duration +} + +type CallResponse struct { + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} +} + // for lowercase func // func MethodMapper() map[string][string] { // return map[string][string]{} diff --git a/config/reference_config.go b/config/reference_config.go index 8703c459ba..1e469d037c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,7 +55,7 @@ type ReferenceConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` Version string `yaml:"version" json:"version,omitempty" property:"version"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - async bool `yaml:"async" json:"async,omitempty" property:"async"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL @@ -140,8 +140,10 @@ func (refconfig *ReferenceConfig) Refer() { } } + callback := GetCallback(refconfig.id) + //create proxy - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url) } // @v is service provider implemented RPCService @@ -169,7 +171,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync - urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) + urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) //application info urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index a81dbf06ce..2cb9793c18 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -93,6 +93,7 @@ func doInitConsumer() { Retries: "3", Group: "huadong_idc", Version: "1.0.0", + Async: true, Methods: []*MethodConfig{ { Name: "GetUser", diff --git a/config/service.go b/config/service.go index 2bceac4a8c..ec21bac883 100644 --- a/config/service.go +++ b/config/service.go @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService { func GetProviderService(name string) common.RPCService { return proServices[name] } + +func GetCallback(name string) func(common.CallResponse) { + service := GetConsumerService(name) + if sv, ok := service.(common.AsyncCallbackService); ok { + return sv.CallBack + } + return nil +} diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index ba74d86c0c..1e00f699b2 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -107,6 +107,7 @@ func setClientGrpool() { } type Options struct { + common.Options // connect timeout ConnectTimeout time.Duration // request timeout @@ -114,14 +115,10 @@ type Options struct { } type CallResponse struct { - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} + common.CallResponse } -type AsyncCallback func(response CallResponse) +type AsyncCallback func(response common.CallResponse) type Client struct { opts Options diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index a878ffd91e..1874b94c47 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -26,6 +26,7 @@ import ( import ( "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go/common" perrors "github.com/pkg/errors" ) @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() CallResponse { - return CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallResponse { + return common.CallResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index bc321a97a4..df27da4e88 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) From 5bf5fc1ccc52b93582ed8e0230283e39a209028c Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 3 Dec 2019 10:05:57 +0800 Subject: [PATCH 02/11] fix case --- protocol/dubbo/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index eb1f15c862..90dbf9c5f3 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -144,7 +144,7 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) { + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallResponse) { assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }, NewResponse(user, nil)) From ae50c05610f1155a8a8b606a5124855f9b111e22 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 3 Dec 2019 10:41:13 +0800 Subject: [PATCH 03/11] fix case --- protocol/dubbo/dubbo_invoker_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 0a765356f7..d2553884d5 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -21,6 +21,8 @@ import ( "sync" "testing" "time" + + "github.com/apache/dubbo-go/common" ) import ( @@ -65,7 +67,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { // AsyncCall lock := sync.Mutex{} lock.Lock() - inv.SetCallBack(func(response CallResponse) { + inv.SetCallBack(func(response common.CallResponse) { assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }) From 4a49f8c51bd60fa253e9ce5e63e4bbbab9ace493 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 2 Dec 2019 18:50:06 +0800 Subject: [PATCH 04/11] add async callback --- common/proxy/proxy_factory.go | 2 +- common/proxy/proxy_factory/default.go | 4 ++-- common/proxy/proxy_factory/default_test.go | 2 +- common/rpc_service.go | 21 +++++++++++++++++++++ config/reference_config.go | 8 +++++--- config/reference_config_test.go | 1 + config/service.go | 8 ++++++++ protocol/dubbo/client.go | 9 +++------ protocol/dubbo/codec.go | 5 +++-- protocol/dubbo/dubbo_invoker.go | 2 +- 10 files changed, 46 insertions(+), 16 deletions(-) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index 2567e0ee09..cd6ef3d1db 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -23,7 +23,7 @@ import ( ) type ProxyFactory interface { - GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index bafba60b40..bf87e6380d 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -54,11 +54,11 @@ type DefaultProxyFactory struct { func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } -func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") - return proxy.NewProxy(invoker, nil, attachments) + return proxy.NewProxy(invoker, callBack, attachments) } func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index b6a6b675ba..cba7757e4b 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -33,7 +33,7 @@ import ( func Test_GetProxy(t *testing.T) { proxyFactory := NewDefaultProxyFactory() url := common.NewURLWithOptions() - proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url) assert.NotNil(t, proxy) } diff --git a/common/rpc_service.go b/common/rpc_service.go index 4741a6fa3c..a642ea0ae9 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -22,6 +22,7 @@ import ( "reflect" "strings" "sync" + "time" "unicode" "unicode/utf8" ) @@ -39,6 +40,26 @@ type RPCService interface { Reference() string // rpc service id or reference id } +// callback interface for async +type AsyncCallbackService interface { + CallBack(response CallResponse) // callback +} + +type Options struct { + // connect timeout + ConnectTimeout time.Duration + // request timeout + RequestTimeout time.Duration +} + +type CallResponse struct { + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} +} + // for lowercase func // func MethodMapper() map[string][string] { // return map[string][string]{} diff --git a/config/reference_config.go b/config/reference_config.go index 8703c459ba..1e469d037c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,7 +55,7 @@ type ReferenceConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` Version string `yaml:"version" json:"version,omitempty" property:"version"` Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - async bool `yaml:"async" json:"async,omitempty" property:"async"` + Async bool `yaml:"async" json:"async,omitempty" property:"async"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL @@ -140,8 +140,10 @@ func (refconfig *ReferenceConfig) Refer() { } } + callback := GetCallback(refconfig.id) + //create proxy - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url) } // @v is service provider implemented RPCService @@ -169,7 +171,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) //getty invoke async or sync - urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) + urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async)) //application info urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index a81dbf06ce..2cb9793c18 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -93,6 +93,7 @@ func doInitConsumer() { Retries: "3", Group: "huadong_idc", Version: "1.0.0", + Async: true, Methods: []*MethodConfig{ { Name: "GetUser", diff --git a/config/service.go b/config/service.go index 2bceac4a8c..ec21bac883 100644 --- a/config/service.go +++ b/config/service.go @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService { func GetProviderService(name string) common.RPCService { return proServices[name] } + +func GetCallback(name string) func(common.CallResponse) { + service := GetConsumerService(name) + if sv, ok := service.(common.AsyncCallbackService); ok { + return sv.CallBack + } + return nil +} diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index ba74d86c0c..1e00f699b2 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -107,6 +107,7 @@ func setClientGrpool() { } type Options struct { + common.Options // connect timeout ConnectTimeout time.Duration // request timeout @@ -114,14 +115,10 @@ type Options struct { } type CallResponse struct { - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} + common.CallResponse } -type AsyncCallback func(response CallResponse) +type AsyncCallback func(response common.CallResponse) type Client struct { opts Options diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index a878ffd91e..1874b94c47 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -26,6 +26,7 @@ import ( import ( "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go/common" perrors "github.com/pkg/errors" ) @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() CallResponse { - return CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallResponse { + return common.CallResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index bc321a97a4..df27da4e88 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) From 64815e81b9b7d13bb0535e4559dd15f894b36a9b Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 3 Dec 2019 10:05:57 +0800 Subject: [PATCH 05/11] fix case --- protocol/dubbo/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index eb1f15c862..90dbf9c5f3 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -144,7 +144,7 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) { + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallResponse) { assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }, NewResponse(user, nil)) From 0020ac4b9ce18087a4c681fa323afc6ba0da4111 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 3 Dec 2019 10:41:13 +0800 Subject: [PATCH 06/11] fix case --- protocol/dubbo/dubbo_invoker_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 0a765356f7..d2553884d5 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -21,6 +21,8 @@ import ( "sync" "testing" "time" + + "github.com/apache/dubbo-go/common" ) import ( @@ -65,7 +67,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { // AsyncCall lock := sync.Mutex{} lock.Lock() - inv.SetCallBack(func(response CallResponse) { + inv.SetCallBack(func(response common.CallResponse) { assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) lock.Unlock() }) From 498453c7c2d5556734d3dce43d7351eadb06f804 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 4 Dec 2019 17:02:05 +0800 Subject: [PATCH 07/11] modify async callback --- common/rpc_service.go | 21 ++++++--------------- config/service.go | 2 +- protocol/dubbo/client.go | 17 ++++++++++------- protocol/dubbo/client_test.go | 5 +++-- protocol/dubbo/codec.go | 6 +++--- protocol/dubbo/dubbo_invoker.go | 2 +- protocol/dubbo/dubbo_invoker_test.go | 5 +++-- 7 files changed, 27 insertions(+), 31 deletions(-) diff --git a/common/rpc_service.go b/common/rpc_service.go index a642ea0ae9..4c9f083dd0 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -22,7 +22,6 @@ import ( "reflect" "strings" "sync" - "time" "unicode" "unicode/utf8" ) @@ -40,25 +39,17 @@ type RPCService interface { Reference() string // rpc service id or reference id } -// callback interface for async +//AsyncCallbackService callback interface for async type AsyncCallbackService interface { - CallBack(response CallResponse) // callback + CallBack(response CallbackResponse) // callback } -type Options struct { - // connect timeout - ConnectTimeout time.Duration - // request timeout - RequestTimeout time.Duration +//CallbackResponse for different protocol +type CallbackResponse interface { } -type CallResponse struct { - Opts Options - Cause error - Start time.Time // invoke(call) start time == write start time - ReadStart time.Time // read start time, write duration = ReadStart - Start - Reply interface{} -} +//AsyncCallback async callback method +type AsyncCallback func(response CallbackResponse) // for lowercase func // func MethodMapper() map[string][string] { diff --git a/config/service.go b/config/service.go index ec21bac883..f1b51790ca 100644 --- a/config/service.go +++ b/config/service.go @@ -44,7 +44,7 @@ func GetProviderService(name string) common.RPCService { return proServices[name] } -func GetCallback(name string) func(common.CallResponse) { +func GetCallback(name string) func(response common.CallbackResponse) { service := GetConsumerService(name) if sv, ok := service.(common.AsyncCallbackService); ok { return sv.CallBack diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 1e00f699b2..1365838f3b 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -107,19 +107,22 @@ func setClientGrpool() { } type Options struct { - common.Options // connect timeout ConnectTimeout time.Duration // request timeout RequestTimeout time.Duration } -type CallResponse struct { - common.CallResponse +//AsyncCallbackResponse async response for dubbo +type AsyncCallbackResponse struct { + common.CallbackResponse + Opts Options + Cause error + Start time.Time // invoke(call) start time == write start time + ReadStart time.Time // read start time, write duration = ReadStart - Start + Reply interface{} } -type AsyncCallback func(response common.CallResponse) - type Client struct { opts Options conf ClientConfig @@ -196,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error { return perrors.WithStack(c.call(ct, request, response, nil)) } -func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error { +func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error { return perrors.WithStack(c.call(CT_TwoWay, request, response, callback)) } -func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error { +func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 90dbf9c5f3..3f8a8ee98c 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) { user := &User{} lock := sync.Mutex{} lock.Lock() - err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }, NewResponse(user, nil)) assert.NoError(t, err) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 1874b94c47..758363117f 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -110,7 +110,7 @@ type PendingResponse struct { err error start time.Time readStart time.Time - callback AsyncCallback + callback common.AsyncCallback response *Response done chan struct{} } @@ -123,8 +123,8 @@ func NewPendingResponse() *PendingResponse { } } -func (r PendingResponse) GetCallResponse() common.CallResponse { - return common.CallResponse{ +func (r PendingResponse) GetCallResponse() common.CallbackResponse { + return AsyncCallbackResponse{ Cause: r.err, Start: r.start, ReadStart: r.readStart, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index df27da4e88..da12126103 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } response := NewResponse(inv.Reply(), nil) if async { - if callBack, ok := inv.CallBack().(func(response common.CallResponse)); ok { + if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index d2553884d5..e22903c1f2 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -67,8 +67,9 @@ func TestDubboInvoker_Invoke(t *testing.T) { // AsyncCall lock := sync.Mutex{} lock.Lock() - inv.SetCallBack(func(response common.CallResponse) { - assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User)) + inv.SetCallBack(func(response common.CallbackResponse) { + r := response.(AsyncCallbackResponse) + assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(inv) From f89f642f8c009cdc629659c6687ac3d01cf8a347 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 4 Dec 2019 17:13:23 +0800 Subject: [PATCH 08/11] fix case --- protocol/dubbo/dubbo_invoker_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index e22903c1f2..7d60090e2d 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -21,8 +21,6 @@ import ( "sync" "testing" "time" - - "github.com/apache/dubbo-go/common" ) import ( @@ -30,6 +28,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol/invocation" ) From ce6235927df907f6be91d22f1eb8c2803d2d0530 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 4 Dec 2019 17:20:30 +0800 Subject: [PATCH 09/11] fix case --- config/reference_config_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 2cb9793c18..a81dbf06ce 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -93,7 +93,6 @@ func doInitConsumer() { Retries: "3", Group: "huadong_idc", Version: "1.0.0", - Async: true, Methods: []*MethodConfig{ { Name: "GetUser", From 13f15ea5aff8bc00ff0e036c7e007d142060adf8 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 11 Dec 2019 17:19:19 +0800 Subject: [PATCH 10/11] fix review problems --- common/proxy/proxy_factory.go | 3 ++- common/proxy/proxy_factory/default.go | 7 ++++++- common/proxy/proxy_factory/default_test.go | 18 +++++++++++++++++- config/reference_config.go | 9 ++++++--- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/common/proxy/proxy_factory.go b/common/proxy/proxy_factory.go index cd6ef3d1db..116cfe0669 100644 --- a/common/proxy/proxy_factory.go +++ b/common/proxy/proxy_factory.go @@ -23,7 +23,8 @@ import ( ) type ProxyFactory interface { - GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy + GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy + GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy GetInvoker(url common.URL) protocol.Invoker } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index bf87e6380d..06824fdc1e 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -54,12 +54,17 @@ type DefaultProxyFactory struct { func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory { return &DefaultProxyFactory{} } -func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { +func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy { + return factory.GetAsyncProxy(invoker, nil, url) +} + +func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy { //create proxy attachments := map[string]string{} attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false") return proxy.NewProxy(invoker, callBack, attachments) } + func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker { return &ProxyInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), diff --git a/common/proxy/proxy_factory/default_test.go b/common/proxy/proxy_factory/default_test.go index cba7757e4b..7159b4b00e 100644 --- a/common/proxy/proxy_factory/default_test.go +++ b/common/proxy/proxy_factory/default_test.go @@ -18,6 +18,7 @@ package proxy_factory import ( + "fmt" "testing" ) @@ -33,7 +34,22 @@ import ( func Test_GetProxy(t *testing.T) { proxyFactory := NewDefaultProxyFactory() url := common.NewURLWithOptions() - proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), nil, url) + proxy := proxyFactory.GetProxy(protocol.NewBaseInvoker(*url), url) + assert.NotNil(t, proxy) +} + +type TestAsync struct { +} + +func (u *TestAsync) CallBack(res common.CallbackResponse) { + fmt.Println("CallBack res:", res) +} + +func Test_GetAsyncProxy(t *testing.T) { + proxyFactory := NewDefaultProxyFactory() + url := common.NewURLWithOptions() + async := &TestAsync{} + proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(*url), async.CallBack, url) assert.NotNil(t, proxy) } diff --git a/config/reference_config.go b/config/reference_config.go index 1e469d037c..eec2c0fcd7 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -140,10 +140,13 @@ func (refconfig *ReferenceConfig) Refer() { } } - callback := GetCallback(refconfig.id) - //create proxy - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, callback, url) + if refconfig.Async { + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) + } else { + callback := GetCallback(refconfig.id) + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(refconfig.invoker, callback, url) + } } // @v is service provider implemented RPCService From 25d2c237e9ed750eb32c1bd5c13ffb8828d892b9 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 17 Dec 2019 23:34:01 +0800 Subject: [PATCH 11/11] add test case for async call --- common/proxy/proxy.go | 4 ++++ config/reference_config.go | 4 ++-- config/reference_config_test.go | 37 +++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 1c079f6bca..d13646dba8 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -181,3 +181,7 @@ func (p *Proxy) Implement(v common.RPCService) { func (p *Proxy) Get() common.RPCService { return p.rpc } + +func (p *Proxy) GetCallback() interface{} { + return p.callBack +} diff --git a/config/reference_config.go b/config/reference_config.go index eec2c0fcd7..6b34f55359 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -142,10 +142,10 @@ func (refconfig *ReferenceConfig) Refer() { //create proxy if refconfig.Async { - refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) - } else { callback := GetCallback(refconfig.id) refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(refconfig.invoker, callback, url) + } else { + refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url) } } diff --git a/config/reference_config_test.go b/config/reference_config_test.go index a81dbf06ce..a7af925cab 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -81,6 +81,7 @@ func doInitConsumer() { }, References: map[string]*ReferenceConfig{ "MockService": { + id: "MockProvider", Params: map[string]string{ "serviceid": "soa.mock", "forks": "5", @@ -110,6 +111,26 @@ func doInitConsumer() { } } +var mockProvider = new(MockProvider) + +type MockProvider struct { +} + +func (m *MockProvider) Reference() string { + return "MockProvider" +} + +func (m *MockProvider) CallBack(res common.CallbackResponse) { +} + +func doInitConsumerAsync() { + doInitConsumer() + SetConsumerService(mockProvider) + for _, v := range consumerConfig.References { + v.Async = true + } +} + func doInitConsumerWithSingleRegistry() { consumerConfig = &ConsumerConfig{ ApplicationConfig: &ApplicationConfig{ @@ -181,6 +202,22 @@ func Test_Refer(t *testing.T) { } consumerConfig = nil } + +func Test_ReferAsync(t *testing.T) { + doInitConsumerAsync() + extension.SetProtocol("registry", GetProtocol) + extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + + for _, reference := range consumerConfig.References { + reference.Refer() + assert.Equal(t, "soa.mock", reference.Params["serviceid"]) + assert.NotNil(t, reference.invoker) + assert.NotNil(t, reference.pxy) + assert.NotNil(t, reference.pxy.GetCallback()) + } + consumerConfig = nil +} + func Test_ReferP2P(t *testing.T) { doInitConsumer() extension.SetProtocol("dubbo", GetProtocol)