From 8ffd650b58d233d67fcff5948d5d3aca5b77039c Mon Sep 17 00:00:00 2001 From: James Sun Date: Tue, 7 Jan 2020 23:39:30 -0800 Subject: [PATCH] Abstract RemoteNodeState HTTP implementation to HttpRemoteNodeState --- .../presto/metadata/DiscoveryNodeManager.java | 2 +- .../presto/metadata/HttpRemoteNodeState.java | 117 ++++++++++++++++++ .../presto/metadata/RemoteNodeState.java | 95 +------------- 3 files changed, 121 insertions(+), 93 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/metadata/HttpRemoteNodeState.java diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index 4c0c41215bb9b..3f1c164ef4b31 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -177,7 +177,7 @@ private void pollWorkers() // Add new nodes for (InternalNode node : aliveNodes) { nodeStates.putIfAbsent(node.getNodeIdentifier(), - new RemoteNodeState(httpClient, uriBuilderFrom(node.getInternalUri()).appendPath("/v1/info/state").build())); + new HttpRemoteNodeState(httpClient, uriBuilderFrom(node.getInternalUri()).appendPath("/v1/info/state").build())); } // Schedule refresh diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/HttpRemoteNodeState.java b/presto-main/src/main/java/com/facebook/presto/metadata/HttpRemoteNodeState.java new file mode 100644 index 0000000000000..cdbad264471d4 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/metadata/HttpRemoteNodeState.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.airlift.http.client.FullJsonResponseHandler.JsonResponse; +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.HttpClient.HttpResponseFuture; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.NodeState; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import io.airlift.units.Duration; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; +import static com.facebook.airlift.http.client.HttpStatus.OK; +import static com.facebook.airlift.http.client.Request.Builder.prepareGet; +import static com.facebook.airlift.json.JsonCodec.jsonCodec; +import static com.google.common.net.MediaType.JSON_UTF_8; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.airlift.units.Duration.nanosSince; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; +import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; + +@ThreadSafe +public class HttpRemoteNodeState + implements RemoteNodeState +{ + private static final Logger log = Logger.get(HttpRemoteNodeState.class); + + private final HttpClient httpClient; + private final URI stateInfoUri; + private final AtomicReference> nodeState = new AtomicReference<>(Optional.empty()); + private final AtomicReference> future = new AtomicReference<>(); + private final AtomicLong lastUpdateNanos = new AtomicLong(); + private final AtomicLong lastWarningLogged = new AtomicLong(); + + public HttpRemoteNodeState(HttpClient httpClient, URI stateInfoUri) + { + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.stateInfoUri = requireNonNull(stateInfoUri, "stateInfoUri is null"); + } + + @Override + public Optional getNodeState() + { + return nodeState.get(); + } + + @Override + public synchronized void asyncRefresh() + { + Duration sinceUpdate = nanosSince(lastUpdateNanos.get()); + if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 && + sinceUpdate.toMillis() > 10_000 && + future.get() != null) { + log.warn("Node state update request to %s has not returned in %s", stateInfoUri, sinceUpdate.toString(SECONDS)); + lastWarningLogged.set(System.nanoTime()); + } + if (sinceUpdate.toMillis() > 1_000 && future.get() == null) { + Request request = prepareGet() + .setUri(stateInfoUri) + .setHeader(CONTENT_TYPE, JSON_UTF_8.toString()) + .build(); + HttpResponseFuture> responseFuture = httpClient.executeAsync(request, createFullJsonResponseHandler(jsonCodec(NodeState.class))); + future.compareAndSet(null, responseFuture); + + Futures.addCallback(responseFuture, new FutureCallback>() + { + @Override + public void onSuccess(@Nullable JsonResponse result) + { + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + if (result != null) { + if (result.hasValue()) { + nodeState.set(Optional.ofNullable(result.getValue())); + } + if (result.getStatusCode() != OK.code()) { + log.warn("Error fetching node state from %s returned status %d: %s", stateInfoUri, result.getStatusCode(), result.getStatusMessage()); + return; + } + } + } + + @Override + public void onFailure(Throwable t) + { + log.warn("Error fetching node state from %s: %s", stateInfoUri, t.getMessage()); + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + } + }, directExecutor()); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/RemoteNodeState.java b/presto-main/src/main/java/com/facebook/presto/metadata/RemoteNodeState.java index 6bb16384c8cd4..56e824ba92ce1 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/RemoteNodeState.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/RemoteNodeState.java @@ -13,102 +13,13 @@ */ package com.facebook.presto.metadata; -import com.facebook.airlift.http.client.FullJsonResponseHandler.JsonResponse; -import com.facebook.airlift.http.client.HttpClient; -import com.facebook.airlift.http.client.HttpClient.HttpResponseFuture; -import com.facebook.airlift.http.client.Request; -import com.facebook.airlift.log.Logger; import com.facebook.presto.spi.NodeState; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import io.airlift.units.Duration; -import javax.annotation.Nullable; -import javax.annotation.concurrent.ThreadSafe; - -import java.net.URI; import java.util.Optional; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static com.facebook.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; -import static com.facebook.airlift.http.client.HttpStatus.OK; -import static com.facebook.airlift.http.client.Request.Builder.prepareGet; -import static com.facebook.airlift.json.JsonCodec.jsonCodec; -import static com.google.common.net.MediaType.JSON_UTF_8; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import static io.airlift.units.Duration.nanosSince; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.SECONDS; -import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; -@ThreadSafe -public class RemoteNodeState +public interface RemoteNodeState { - private static final Logger log = Logger.get(RemoteNodeState.class); - - private final HttpClient httpClient; - private final URI stateInfoUri; - private final AtomicReference> nodeState = new AtomicReference<>(Optional.empty()); - private final AtomicReference> future = new AtomicReference<>(); - private final AtomicLong lastUpdateNanos = new AtomicLong(); - private final AtomicLong lastWarningLogged = new AtomicLong(); - - public RemoteNodeState(HttpClient httpClient, URI stateInfoUri) - { - this.httpClient = requireNonNull(httpClient, "httpClient is null"); - this.stateInfoUri = requireNonNull(stateInfoUri, "stateInfoUri is null"); - } - - public Optional getNodeState() - { - return nodeState.get(); - } - - public synchronized void asyncRefresh() - { - Duration sinceUpdate = nanosSince(lastUpdateNanos.get()); - if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 && - sinceUpdate.toMillis() > 10_000 && - future.get() != null) { - log.warn("Node state update request to %s has not returned in %s", stateInfoUri, sinceUpdate.toString(SECONDS)); - lastWarningLogged.set(System.nanoTime()); - } - if (sinceUpdate.toMillis() > 1_000 && future.get() == null) { - Request request = prepareGet() - .setUri(stateInfoUri) - .setHeader(CONTENT_TYPE, JSON_UTF_8.toString()) - .build(); - HttpResponseFuture> responseFuture = httpClient.executeAsync(request, createFullJsonResponseHandler(jsonCodec(NodeState.class))); - future.compareAndSet(null, responseFuture); - - Futures.addCallback(responseFuture, new FutureCallback>() - { - @Override - public void onSuccess(@Nullable JsonResponse result) - { - lastUpdateNanos.set(System.nanoTime()); - future.compareAndSet(responseFuture, null); - if (result != null) { - if (result.hasValue()) { - nodeState.set(Optional.ofNullable(result.getValue())); - } - if (result.getStatusCode() != OK.code()) { - log.warn("Error fetching node state from %s returned status %d: %s", stateInfoUri, result.getStatusCode(), result.getStatusMessage()); - return; - } - } - } + Optional getNodeState(); - @Override - public void onFailure(Throwable t) - { - log.warn("Error fetching node state from %s: %s", stateInfoUri, t.getMessage()); - lastUpdateNanos.set(System.nanoTime()); - future.compareAndSet(responseFuture, null); - } - }, directExecutor()); - } - } + void asyncRefresh(); }