Skip to content

Commit

Permalink
[Dubbo- support tag router feature] Add a new Router implement -- Tag…
Browse files Browse the repository at this point in the history
…Router (#2228)

* tagRouter feature

* update dubbo.xsd

* remove reference router param

* add Unit Test

* rollback pom.xml for merge

* rollback pom.xml for merge

* fix checkstyle

* fix checkstyle

* fix unit test

* format import style

* add license&remove author info

* trigger again
  • Loading branch information
lexburner authored and beiwei30 committed Aug 22, 2018
1 parent 7500033 commit fcd1af8
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.tag;


import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Router;

import java.util.ArrayList;
import java.util.List;

/**
* TagRouter
*/
public class TagRouter implements Router, Comparable<Router> {

private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);

private final int priority;
private final URL url;

public static final URL ROUTER_URL = new URL("tag", Constants.ANYHOST_VALUE, 0, Constants.ANY_VALUE).addParameters(Constants.RUNTIME_KEY, "true");

public TagRouter(URL url) {
this.url = url;
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
}

public TagRouter() {
this.url = ROUTER_URL;
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
}

@Override
public URL getUrl() {
return url;
}

@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// filter
List<Invoker<T>> result = new ArrayList<>();
try {
// Dynamic param
String tag = RpcContext.getContext().getAttachment(Constants.REQUEST_TAG_KEY);
// Tag request
if (!StringUtils.isEmpty(tag)) {
// Select tag invokers first
for (Invoker<T> invoker : invokers) {
if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
// If no invoker be selected, downgrade to normal invokers
if (result.isEmpty()) {
for (Invoker<T> invoker : invokers) {
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
// Normal request
} else {
for (Invoker<T> invoker : invokers) {
// Can't access tag invoker,only normal invoker should be selected
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);
}
}
}
return result;
} catch (Exception e) {
logger.error("Route by tag error,return all invokers.", e);
}
// Downgrade to all invokers
return invokers;
}

@Override
public int compareTo(Router o) {
if (o == null || o.getClass() != TagRouter.class) {
return 1;
}
TagRouter c = (TagRouter) o;
return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.tag;


import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterFactory;

public class TagRouterFactory implements RouterFactory {

public static final String NAME = "tag";

@Override
public Router getRouter(URL url) {
return new TagRouter(url);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.router.tag;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterFactory;
import org.apache.dubbo.rpc.cluster.router.MockInvoker;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

public class TagRouterTest {

private URL tagUrl = new URL("tag"
, Constants.ANYHOST_VALUE, 0
, Constants.ANY_VALUE)
.addParameters(
Constants.RUNTIME_KEY, "true"
);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@Before
public void setUp() throws Exception {
}

@Test
public void testRoute_matchTag() {

RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "red");

List<Invoker<String>> invokers = new ArrayList<>();
Invoker<String> redInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
Invoker<String> yellowInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
Invoker<String> blueInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
Invoker<String> defaultInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.4:20880/com.foo.BarService"));

invokers.add(redInvoker);
invokers.add(yellowInvoker);
invokers.add(blueInvoker);
invokers.add(defaultInvoker);

Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
List<Invoker<String>> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
Assert.assertTrue(filteredInvokers.contains(redInvoker));
Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
Assert.assertFalse(filteredInvokers.contains(blueInvoker));
Assert.assertFalse(filteredInvokers.contains(defaultInvoker));
}

@Test
public void testRoute_matchDefault() {

RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "");

List<Invoker<String>> invokers = new ArrayList<>();
Invoker<String> redInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
Invoker<String> yellowInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
Invoker<String> blueInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
Invoker<String> defaultInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.4:20880/com.foo.BarService"));

invokers.add(redInvoker);
invokers.add(yellowInvoker);
invokers.add(blueInvoker);
invokers.add(defaultInvoker);

Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
List<Invoker<String>> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
Assert.assertTrue(filteredInvokers.contains(defaultInvoker));
Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
Assert.assertFalse(filteredInvokers.contains(blueInvoker));
Assert.assertFalse(filteredInvokers.contains(redInvoker));
}

@Test
public void testRoute_requestWithTag_shouldDowngrade() {

RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "black");

List<Invoker<String>> invokers = new ArrayList<>();
Invoker<String> redInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
Invoker<String> yellowInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
Invoker<String> blueInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));
Invoker<String> defaultInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.4:20880/com.foo.BarService"));

invokers.add(redInvoker);
invokers.add(yellowInvoker);
invokers.add(blueInvoker);
invokers.add(defaultInvoker);

Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
List<Invoker<String>> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
Assert.assertTrue(filteredInvokers.contains(defaultInvoker));
Assert.assertFalse(filteredInvokers.contains(yellowInvoker));
Assert.assertFalse(filteredInvokers.contains(blueInvoker));
Assert.assertFalse(filteredInvokers.contains(redInvoker));
}

@Test
public void testRoute_requestWithoutTag_shouldNotDowngrade() {

RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY, "");

List<Invoker<String>> invokers = new ArrayList<>();
Invoker<String> redInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.1:20880/com.foo.BarService?tag=red"));
Invoker<String> yellowInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.2:20880/com.foo.BarService?tag=yellow"));
Invoker<String> blueInvoker = new MockInvoker<>(URL.valueOf(
"dubbo://10.20.3.3:20880/com.foo.BarService?tag=blue"));

invokers.add(redInvoker);
invokers.add(yellowInvoker);
invokers.add(blueInvoker);

Router tagRouter = new TagRouterFactory().getRouter(tagUrl);
List<Invoker<String>> filteredInvokers = tagRouter.route(invokers, URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation());
Assert.assertEquals(0, filteredInvokers.size());
}

@Test
public void testRoute_createBySpi() {
URL zkProvider = URL.valueOf("zookeeper://10.20.3.1:20880/com.foo.BarService?router=tag");
String parameter = zkProvider.getParameter(Constants.ROUTER_KEY);
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(parameter);
Router tagRouter = routerFactory.getRouter(zkProvider);
Assert.assertTrue(tagRouter instanceof TagRouter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ public class Constants {

public static final String MULTICAST = "multicast";

public static final String TAG_KEY = "tag";

public static final String REQUEST_TAG_KEY = "request.tag";

/*
* private Constants(){ }
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,5 +522,4 @@ public String getScope() {
public void setScope(String scope) {
this.scope = scope;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,4 @@ public void setGroup(String group) {
checkKey("group", group);
this.group = group;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
// serialization
private String serialization;

// provider tag
protected String tag;

public String getVersion() {
return version;
}
Expand Down Expand Up @@ -240,4 +243,11 @@ public void setSerialization(String serialization) {
this.serialization = serialization;
}

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,9 @@
* Registry spring bean name
*/
String[] registry() default {};

/**
* Service tag name
*/
String tag() default "";
}
Loading

0 comments on commit fcd1af8

Please sign in to comment.