Skip to content

Commit

Permalink
[Enhancement]: RestProtocol (#3480)
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhenxu94 authored and lixiaojiee committed Feb 17, 2019
1 parent 2b12c16 commit 813fed5
Showing 1 changed file with 20 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@

import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.jboss.resteasy.client.jaxrs.ResteasyClient;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
Expand Down Expand Up @@ -65,7 +62,7 @@ public class RestProtocol extends AbstractProxyProtocol {
private static final int HTTPCLIENTCONNECTIONMANAGER_CLOSEWAITTIME_MS = 1000;
private static final int HTTPCLIENTCONNECTIONMANAGER_CLOSEIDLETIME_S = 30;

private final Map<String, RestServer> servers = new ConcurrentHashMap<String, RestServer>();
private final Map<String, RestServer> servers = new ConcurrentHashMap<>();

private final RestServerFactory serverFactory = new RestServerFactory();

Expand All @@ -91,12 +88,11 @@ public int getDefaultPort() {
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
Class implClass = ApplicationModel.getProviderModel(url.getServiceKey()).getServiceInstance().getClass();
RestServer server = servers.get(addr);
if (server == null) {
server = serverFactory.createServer(url.getParameter(Constants.SERVER_KEY, DEFAULT_SERVER));
server.start(url);
servers.put(addr, server);
}
RestServer server = servers.computeIfAbsent(addr, restServer -> {
RestServer s = serverFactory.createServer(url.getParameter(Constants.SERVER_KEY, DEFAULT_SERVER));
s.start(url);
return s;
});

String contextPath = getContextPath(url);
if ("servlet".equalsIgnoreCase(url.getParameter(Constants.SERVER_KEY, DEFAULT_SERVER))) {
Expand Down Expand Up @@ -124,13 +120,10 @@ protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcExcept
server.deploy(resourceDef, impl, contextPath);

final RestServer s = server;
return new Runnable() {
@Override
public void run() {
// TODO due to dubbo's current architecture,
// it will be called from registry protocol in the shutdown process and won't appear in logs
s.undeploy(resourceDef);
}
return () -> {
// TODO due to dubbo's current architecture,
// it will be called from registry protocol in the shutdown process and won't appear in logs
s.undeploy(resourceDef);
};
}

Expand Down Expand Up @@ -159,20 +152,17 @@ protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
.build();

CloseableHttpClient httpClient = HttpClientBuilder.create()
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase(Constants.TIMEOUT_KEY)) {
return Long.parseLong(value) * 1000;
}
.setKeepAliveStrategy((response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase(Constants.TIMEOUT_KEY)) {
return Long.parseLong(value) * 1000;
}
return HTTPCLIENT_KEEPALIVEDURATION;
}
return HTTPCLIENT_KEEPALIVEDURATION;
})
.setDefaultRequestConfig(requestConfig)
.setDefaultSocketConfig(socketConfig)
Expand Down Expand Up @@ -245,7 +235,7 @@ protected String getContextPath(URL url) {

protected class ConnectionMonitor extends Thread {
private volatile boolean shutdown;
private final List<PoolingHttpClientConnectionManager> connectionManagers = Collections.synchronizedList(new LinkedList<PoolingHttpClientConnectionManager>());
private final List<PoolingHttpClientConnectionManager> connectionManagers = Collections.synchronizedList(new LinkedList<>());

public void addConnectionManager(PoolingHttpClientConnectionManager connectionManager) {
connectionManagers.add(connectionManager);
Expand Down

0 comments on commit 813fed5

Please sign in to comment.