diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 9003d0cb24f2c..b742fff795d6f 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -68,7 +68,8 @@ Metrics may have additional dimensions beyond those listed above. |`init/metadatacache/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.| |`segment/metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`| |`segment/metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`| - +|`segment/serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `segment/serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| +|`segment/serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| ### Historical @@ -324,6 +325,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`metadata/kill/rule/count`|Total number of rules that were automatically deleted from metadata store per each Coordinator kill rule duty run. This metric can help adjust `druid.coordinator.kill.rule.durationToRetain` configuration based on whether more or less rules need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is set to true.| |Varies| |`metadata/kill/datasource/count`|Total number of datasource metadata that were automatically deleted from metadata store per each Coordinator kill datasource duty run (Note: datasource metadata only exists for datasource created from supervisor). This metric can help adjust `druid.coordinator.kill.datasource.durationToRetain` configuration based on whether more or less datasource metadata need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.datasource.on` is set to true.| |Varies| |`init/serverview/time`|Time taken to initialize the coordinator server view.||Depends on the number of segments| +|`segment/serverview/sync/healthy`|Sync status of the Coordinator with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `segment/serverview/sync/unstableTime` to debug slow startup of the Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise| +|`segment/serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| ## General Health diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 8674d5a111315..a6354c906287c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -791,7 +791,7 @@ void syncMonitoring() final Set> workerEntrySet = ImmutableSet.copyOf(workers.entrySet()); for (Map.Entry e : workerEntrySet) { WorkerHolder workerHolder = e.getValue(); - if (!workerHolder.getUnderlyingSyncer().isOK()) { + if (workerHolder.getUnderlyingSyncer().needsReset()) { synchronized (workers) { // check again that server is still there and only then reset. if (workers.containsKey(e.getKey())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 0bf4de0df864d..a8fc53060483f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -55,7 +55,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -340,7 +339,7 @@ public void stop() public void waitForInitialization() throws InterruptedException { - if (!syncer.awaitInitialization(3 * syncer.getServerHttpTimeout(), TimeUnit.MILLISECONDS)) { + if (!syncer.awaitInitialization()) { throw new RE("Failed to sync with worker[%s].", worker.getHost()); } } @@ -348,7 +347,7 @@ public void waitForInitialization() throws InterruptedException public boolean isInitialized() { try { - return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS); + return syncer.isInitialized(); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index fe1b0ca498f65..6cc6329c58484 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -1789,7 +1789,7 @@ private Worker createWorker(String host) private WorkerHolder createNonSyncingWorkerHolder(Worker worker) { ChangeRequestHttpSyncer syncer = EasyMock.createMock(ChangeRequestHttpSyncer.class); - EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes(); + EasyMock.expect(syncer.needsReset()).andReturn(true).anyTimes(); EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes(); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java index 5f4a9b843a6cd..f13a0b0cce808 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java @@ -78,6 +78,7 @@ public void testSyncListener() ChangeRequestHttpSyncer.Listener syncListener = workerHolder.createSyncListener(); Assert.assertTrue(workerHolder.disabled.get()); + Assert.assertFalse(workerHolder.isInitialized()); syncListener.fullSync( ImmutableList.of( diff --git a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java new file mode 100644 index 0000000000000..2d941828a0eed --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.base.Ticker; +import org.joda.time.Duration; + +import java.util.concurrent.TimeUnit; + +/** + * Thread-safe wrapper over {@link com.google.common.base.Stopwatch}. + *

+ * Thread safety has been limited to the start/stop methods for now as they are + * the only ones that can throw an exception in an illegal state and are thus + * vulnerable to race conditions. + */ +public class Stopwatch +{ + private final com.google.common.base.Stopwatch delegate; + + public static Stopwatch createStarted() + { + return new Stopwatch(com.google.common.base.Stopwatch.createStarted()); + } + + public static Stopwatch createUnstarted() + { + return new Stopwatch(com.google.common.base.Stopwatch.createUnstarted()); + } + + public static Stopwatch createStarted(Ticker ticker) + { + return new Stopwatch(com.google.common.base.Stopwatch.createStarted(ticker)); + } + + private Stopwatch(com.google.common.base.Stopwatch delegate) + { + this.delegate = delegate; + } + + public synchronized void start() + { + delegate.start(); + } + + public synchronized void stop() + { + delegate.stop(); + } + + public synchronized void reset() + { + delegate.reset(); + } + + /** + * Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}. + */ + public synchronized void restart() + { + delegate.reset().start(); + } + + public synchronized boolean isRunning() + { + return delegate.isRunning(); + } + + /** + * Returns the milliseconds elapsed on the stopwatch. + */ + public long millisElapsed() + { + return delegate.elapsed(TimeUnit.MILLISECONDS); + } + + /** + * Checks if the given duration has already elapsed on the stopwatch. + */ + public boolean hasElapsed(Duration duration) + { + return millisElapsed() >= duration.getMillis(); + } + + /** + * Checks that the given duration has not elapsed on the stopwatch. Calling this + * method is the same as {@code !stopwatch.hasElapsed(duration)}. + */ + public boolean hasNotElapsed(Duration duration) + { + return !hasElapsed(duration); + } + +} diff --git a/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java b/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java new file mode 100644 index 0000000000000..06bed222e0095 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.testing.FakeTicker; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class StopwatchTest +{ + + @Test + public void testDuplicateStartThrowsException() + { + Stopwatch stopwatch = Stopwatch.createStarted(); + Assert.assertThrows(IllegalStateException.class, stopwatch::start); + } + + @Test + public void testDuplicateStopThrowsException() + { + Stopwatch stopwatch = Stopwatch.createUnstarted(); + Assert.assertThrows(IllegalStateException.class, stopwatch::stop); + } + + @Test + public void testMillisElapsed() + { + FakeTicker fakeTicker = new FakeTicker(); + Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker); + fakeTicker.advance(100, TimeUnit.MILLISECONDS); + stopwatch.stop(); + + Assert.assertEquals(100, stopwatch.millisElapsed()); + } + + @Test + public void testHasElapsed() + { + FakeTicker fakeTicker = new FakeTicker(); + Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker); + fakeTicker.advance(100, TimeUnit.MILLISECONDS); + stopwatch.stop(); + + Assert.assertTrue(stopwatch.hasElapsed(Duration.millis(50))); + Assert.assertTrue(stopwatch.hasElapsed(Duration.millis(100))); + Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(101))); + Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(500))); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 395245815792d..52049c6956f67 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -32,6 +33,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier { private final List events = new ArrayList<>(); + private final List alertEvents = new ArrayList<>(); private final Map> metricEvents = new HashMap<>(); public StubServiceEmitter(String service, String host) @@ -46,6 +48,8 @@ public void emit(Event event) ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) .add(metricEvent); + } else if (event instanceof AlertEvent) { + alertEvents.add((AlertEvent) event); } events.add(event); } @@ -58,6 +62,11 @@ public List getEvents() return events; } + public List getAlerts() + { + return alertEvents; + } + @Override public List getMetricValues( String metricName, @@ -92,6 +101,7 @@ public void start() public void flush() { events.clear(); + alertEvents.clear(); metricEvents.clear(); } diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java index 6c52866d05868..5901044879354 100644 --- a/server/src/main/java/org/apache/druid/client/DruidServer.java +++ b/server/src/main/java/org/apache/druid/client/DruidServer.java @@ -320,4 +320,17 @@ public ImmutableDruidServer toImmutableDruidServer() immutableDataSources.values().stream().mapToInt(dataSource -> dataSource.getSegments().size()).sum(); return new ImmutableDruidServer(metadata, size, immutableDataSources, totalSegments); } + + public DruidServer copyWithoutSegments() + { + return new DruidServer( + getName(), + getHostAndPort(), + getHostAndTlsPort(), + getMaxSize(), + getType(), + getTier(), + getPriority() + ); + } } diff --git a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java index bce9c562f6b00..b6c3ba2a63f1c 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -25,6 +25,8 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import javax.validation.constraints.NotNull; @@ -51,6 +53,14 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn @NotNull private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + @JacksonInject + @NotNull + private ScheduledExecutorFactory executorFactory; + + @JacksonInject + @NotNull + private ServiceEmitter serviceEmitter; + @Override public HttpServerInventoryView get() { @@ -60,6 +70,8 @@ public HttpServerInventoryView get() druidNodeDiscoveryProvider, Predicates.alwaysFalse(), config, + serviceEmitter, + executorFactory, "FilteredHttpServerInventoryView" ); } diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index 2f86c46064b42..ebc6752794126 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -27,6 +27,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; @@ -39,10 +40,13 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.ChangeRequestHttpSyncer; @@ -54,12 +58,12 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.joda.time.Duration; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -107,11 +111,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); private final String execNamePrefix; - private volatile ScheduledExecutorService executor; + private final ScheduledExecutorFactory executorFactory; + private volatile ScheduledExecutorService inventorySyncExecutor; + private volatile ScheduledExecutorService monitoringExecutor; private final HttpClient httpClient; private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; + private final ServiceEmitter serviceEmitter; public HttpServerInventoryView( final ObjectMapper smileMapper, @@ -119,6 +126,8 @@ public HttpServerInventoryView( final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final Predicate> defaultFilter, final HttpServerInventoryViewConfig config, + final ServiceEmitter serviceEmitter, + final ScheduledExecutorFactory executorFactory, final String execNamePrefix ) { @@ -128,6 +137,8 @@ public HttpServerInventoryView( this.defaultFilter = defaultFilter; this.finalPredicate = defaultFilter; this.config = config; + this.serviceEmitter = serviceEmitter; + this.executorFactory = executorFactory; this.execNamePrefix = execNamePrefix; } @@ -137,16 +148,17 @@ public void start() { synchronized (lifecycleLock) { if (!lifecycleLock.canStart()) { - throw new ISE("can't start."); + throw new ISE("Could not start lifecycle"); } - log.info("Starting %s.", execNamePrefix); + log.info("Starting executor[%s].", execNamePrefix); try { - executor = ScheduledExecutors.fixed( + inventorySyncExecutor = executorFactory.create( config.getNumThreads(), execNamePrefix + "-%s" ); + monitoringExecutor = executorFactory.create(1, execNamePrefix + "-monitor-%s"); DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY); druidNodeDiscovery.registerListener( @@ -170,7 +182,7 @@ public void nodesRemoved(Collection nodes) public void nodeViewInitialized() { if (!initialized.getAndSet(true)) { - executor.execute(HttpServerInventoryView.this::serverInventoryInitialized); + inventorySyncExecutor.execute(HttpServerInventoryView.this::serverInventoryInitialized); } } @@ -204,7 +216,18 @@ private DruidServer toDruidServer(DiscoveryDruidNode node) } ); - scheduleSyncMonitoring(); + ScheduledExecutors.scheduleWithFixedDelay( + monitoringExecutor, + Duration.standardSeconds(60), + Duration.standardMinutes(5), + this::checkAndResetUnhealthyServers + ); + ScheduledExecutors.scheduleAtFixedRate( + monitoringExecutor, + Duration.standardSeconds(30), + Duration.standardMinutes(1), + this::emitServerStatusMetrics + ); lifecycleLock.started(); } @@ -212,7 +235,7 @@ private DruidServer toDruidServer(DiscoveryDruidNode node) lifecycleLock.exitStart(); } - log.info("Started %s.", execNamePrefix); + log.info("Started executor[%s].", execNamePrefix); } } @@ -224,13 +247,16 @@ public void stop() throw new ISE("can't stop."); } - log.info("Stopping %s.", execNamePrefix); + log.info("Stopping executor[%s].", execNamePrefix); - if (executor != null) { - executor.shutdownNow(); + if (inventorySyncExecutor != null) { + inventorySyncExecutor.shutdownNow(); + } + if (monitoringExecutor != null) { + monitoringExecutor.shutdownNow(); } - log.info("Stopped %s.", execNamePrefix); + log.info("Stopped executor[%s].", execNamePrefix); } } @@ -249,10 +275,7 @@ public void registerSegmentCallback( segmentCallbacks.put(filteringSegmentCallback, exec); segmentPredicates.put(filteringSegmentCallback, filter); - finalPredicate = Predicates.or( - defaultFilter, - Predicates.or(segmentPredicates.values()) - ); + updateFinalPredicate(); } @Override @@ -308,10 +331,7 @@ public void run() if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { segmentCallbacks.remove(entry.getKey()); if (segmentPredicates.remove(entry.getKey()) != null) { - finalPredicate = Predicates.or( - defaultFilter, - Predicates.or(segmentPredicates.values()) - ); + updateFinalPredicate(); } } } @@ -320,7 +340,7 @@ public void run() } } - private void runServerCallbacks(final DruidServer server) + private void runServerRemovedCallbacks(final DruidServer server) { for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( @@ -338,8 +358,11 @@ public void run() } } - //best effort wait for first segment listing fetch from all servers and then call - //segmentViewInitialized on all registered segment callbacks. + /** + * Waits until the sync wait timeout for all servers to be synced at least once. + * Finally calls {@link SegmentCallback#segmentViewInitialized()} regardless of + * whether all servers synced successfully or not. + */ private void serverInventoryInitialized() { long start = System.currentTimeMillis(); @@ -360,13 +383,11 @@ private void serverInventoryInitialized() throw new RE(ex, "Interrupted while waiting for queryable server initial successful sync."); } - log.info("Checking whether all servers have been synced at least once yet...."); - Iterator iter = uninitializedServers.iterator(); - while (iter.hasNext()) { - if (iter.next().isSyncedSuccessfullyAtleastOnce()) { - iter.remove(); - } - } + log.info("Waiting for [%d] servers to sync successfully.", uninitializedServers.size()); + uninitializedServers.removeIf( + serverHolder -> serverHolder.isSyncedSuccessfullyAtleastOnce() + || serverHolder.isStopped() + ); } if (uninitializedServers.isEmpty()) { @@ -380,18 +401,13 @@ private void serverInventoryInitialized() } } - log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks."); + log.info("Invoking segment view initialized callbacks."); + runSegmentCallbacks(SegmentCallback::segmentViewInitialized); + } - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentViewInitialized(); - } - } - ); + private void updateFinalPredicate() + { + finalPredicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); } @VisibleForTesting @@ -417,9 +433,9 @@ private void serverRemoved(DruidServer server) if (holder != null) { log.info("Server[%s] disappeared.", server.getName()); holder.stop(); - runServerCallbacks(holder.druidServer); + runServerRemovedCallbacks(holder.druidServer); } else { - log.info("Server[%s] did not exist. Removal notification ignored.", server.getName()); + log.info("Ignoring remove notification for unknown server[%s].", server.getName()); } } } @@ -443,61 +459,57 @@ public Map getDebugInfo() return result; } - private void scheduleSyncMonitoring() - { - executor.scheduleAtFixedRate( - () -> { - log.debug("Running the Sync Monitoring."); - - try { - syncMonitoring(); - } - catch (Exception ex) { - if (ex instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } else { - log.makeAlert(ex, "Exception in sync monitoring.").emit(); - } - } - }, - 1, - 5, - TimeUnit.MINUTES - ); - } - @VisibleForTesting - void syncMonitoring() + void checkAndResetUnhealthyServers() { // Ensure that the collection is not being modified during iteration. Iterate over a copy final Set> serverEntrySet = ImmutableSet.copyOf(servers.entrySet()); for (Map.Entry e : serverEntrySet) { DruidServerHolder serverHolder = e.getValue(); - if (!serverHolder.syncer.isOK()) { + if (serverHolder.syncer.needsReset()) { synchronized (servers) { - // check again that server is still there and only then reset. + // Reset only if the server is still present in the map if (servers.containsKey(e.getKey())) { - log.makeAlert( - "Server[%s] is not syncing properly. Current state is [%s]. Resetting it.", + log.warn( + "Resetting server[%s] with state[%s] as it is not syncing properly.", serverHolder.druidServer.getName(), serverHolder.syncer.getDebugInfo() - ).emit(); + ); serverRemoved(serverHolder.druidServer); - serverAdded(new DruidServer( - serverHolder.druidServer.getName(), - serverHolder.druidServer.getHostAndPort(), - serverHolder.druidServer.getHostAndTlsPort(), - serverHolder.druidServer.getMaxSize(), - serverHolder.druidServer.getType(), - serverHolder.druidServer.getTier(), - serverHolder.druidServer.getPriority() - )); + serverAdded(serverHolder.druidServer.copyWithoutSegments()); } } } } } + private void emitServerStatusMetrics() + { + final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent.builder(); + try { + final Map serversCopy = ImmutableMap.copyOf(servers); + serversCopy.forEach((serverName, serverHolder) -> { + final DruidServer server = serverHolder.druidServer; + eventBuilder.setDimension("tier", server.getTier()); + eventBuilder.setDimension("server", serverName); + + final boolean isSynced = serverHolder.syncer.isSyncedSuccessfully(); + serviceEmitter.emit( + eventBuilder.build("segment/serverview/sync/healthy", isSynced ? 1 : 0) + ); + final long unstableTimeMillis = serverHolder.syncer.getUnstableTimeMillis(); + if (unstableTimeMillis > 0) { + serviceEmitter.emit( + eventBuilder.build("segment/serverview/sync/unstableTime", unstableTimeMillis) + ); + } + }); + } + catch (Exception e) { + log.error(e, "Error while emitting server status metrics"); + } + } + @Override public boolean isStarted() { @@ -514,6 +526,7 @@ public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) private class DruidServerHolder { private final DruidServer druidServer; + private final AtomicBoolean stopped = new AtomicBoolean(false); private final ChangeRequestHttpSyncer syncer; @@ -526,7 +539,7 @@ private class DruidServerHolder this.syncer = new ChangeRequestHttpSyncer<>( smileMapper, httpClient, - executor, + inventorySyncExecutor, new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"), "/druid-internal/v1/segments", SEGMENT_LIST_RESP_TYPE_REF, @@ -548,19 +561,21 @@ void start() void stop() { syncer.stop(); + stopped.set(true); + } + + boolean isStopped() + { + return stopped.get(); } boolean isSyncedSuccessfullyAtleastOnce() { try { - return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS); + return syncer.isInitialized(); } catch (InterruptedException ex) { - throw new RE( - ex, - "Interrupted while waiting for queryable server[%s] initial successful sync.", - druidServer.getName() - ); + throw new ISE(ex, "Interrupted while waiting for first sync with server[%s].", druidServer.getName()); } } diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java index 7f008c58798e1..df939c406c714 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java @@ -25,6 +25,8 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import javax.validation.constraints.NotNull; @@ -51,6 +53,14 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi @NotNull private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null; + @JacksonInject + @NotNull + private ScheduledExecutorFactory executorFactory; + + @JacksonInject + @NotNull + private ServiceEmitter serviceEmitter = null; + @Override public HttpServerInventoryView get() { @@ -60,6 +70,8 @@ public HttpServerInventoryView get() druidNodeDiscoveryProvider, Predicates.alwaysTrue(), config, + serviceEmitter, + executorFactory, "HttpServerInventoryView" ); } diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java index 461e5fea0dfe3..3a62e5344b115 100644 --- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java +++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; +import org.apache.druid.client.DruidServer; import org.apache.druid.jackson.StringObjectPairList; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -199,6 +200,29 @@ public T getService(String key, Class clazz) return null; } + public DruidServer toDruidServer() + { + final DataNodeService dataNodeService = getService( + DataNodeService.DISCOVERY_SERVICE_KEY, + DataNodeService.class + ); + + final DruidNode druidNode = getDruidNode(); + if (dataNodeService == null || druidNode == null) { + return null; + } + + return new DruidServer( + druidNode.getHostAndPortToUse(), + druidNode.getHostAndPort(), + druidNode.getHostAndTlsPort(), + dataNodeService.getMaxSize(), + dataNodeService.getServerType(), + dataNodeService.getTier(), + dataNodeService.getPriority() + ); + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java index 8076f3eddc1be..8af89425ae0f3 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java @@ -29,8 +29,8 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; @@ -71,9 +71,12 @@ public class ChangeRequestHttpSyncer private final String baseRequestPath; private final TypeReference> responseTypeReferences; private final long serverTimeoutMS; - private final long serverUnstabilityTimeout; private final long serverHttpTimeout; + private final Duration maxUnstableDuration; + private final Duration maxDelayBetweenSyncRequests; + private final Duration maxDurationToWaitForSync; + private final Listener listener; private final CountDownLatch initializationLatch = new CountDownLatch(1); @@ -86,10 +89,12 @@ public class ChangeRequestHttpSyncer private final LifecycleLock startStopLock = new LifecycleLock(); private final String logIdentity; - private long unstableStartTime = -1; private int consecutiveFailedAttemptCount = 0; - private long lastSuccessfulSyncTime = 0; - private long lastSyncTime = 0; + + private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted(); + private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted(); + private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted(); + private final Stopwatch sinceUnstable = Stopwatch.createUnstarted(); @Nullable private ChangeRequestHistory.Counter counter = null; @@ -113,29 +118,30 @@ public ChangeRequestHttpSyncer( this.baseRequestPath = baseRequestPath; this.responseTypeReferences = responseTypeReferences; this.serverTimeoutMS = serverTimeoutMS; - this.serverUnstabilityTimeout = serverUnstabilityTimeout; this.serverHttpTimeout = serverTimeoutMS + HTTP_TIMEOUT_EXTRA_MS; this.listener = listener; this.logIdentity = StringUtils.format("%s_%d", baseServerURL, System.currentTimeMillis()); + + this.maxDurationToWaitForSync = Duration.millis(3 * serverHttpTimeout); + this.maxDelayBetweenSyncRequests = Duration.millis(3 * serverHttpTimeout + MAX_RETRY_BACKOFF); + this.maxUnstableDuration = Duration.millis(serverUnstabilityTimeout); } public void start() { - synchronized (startStopLock) { - if (!startStopLock.canStart()) { - throw new ISE("Can't start ChangeRequestHttpSyncer[%s].", logIdentity); + throw new ISE("Could not start sync for server[%s].", logIdentity); } try { - - log.info("Starting ChangeRequestHttpSyncer[%s].", logIdentity); + log.info("Starting sync for server[%s].", logIdentity); startStopLock.started(); } finally { startStopLock.exitStart(); } + sinceSyncerStart.restart(); addNextSyncToWorkQueue(); } } @@ -144,69 +150,91 @@ public void stop() { synchronized (startStopLock) { if (!startStopLock.canStop()) { - throw new ISE("Can't stop ChangeRequestHttpSyncer[%s].", logIdentity); + throw new ISE("Could not stop sync for server[%s].", logIdentity); } try { - log.info("Stopping ChangeRequestHttpSyncer[%s].", logIdentity); + log.info("Stopping sync for server[%s].", logIdentity); } finally { startStopLock.exitStop(); } - log.info("Stopped ChangeRequestHttpSyncer[%s].", logIdentity); + log.info("Stopped sync for server[%s].", logIdentity); } } - /** Wait for first fetch of segment listing from server. */ - public boolean awaitInitialization(long timeout, TimeUnit timeUnit) throws InterruptedException + /** + * Waits for the first successful sync with this server up to {@link #maxDurationToWaitForSync}. + */ + public boolean awaitInitialization() throws InterruptedException { - return initializationLatch.await(timeout, timeUnit); + return initializationLatch.await(maxDurationToWaitForSync.getMillis(), TimeUnit.MILLISECONDS); } /** - * This method returns the debugging information for printing, must not be used for any other purpose. + * Waits upto 1 millisecond for the first successful sync with this server. */ - public Map getDebugInfo() + public boolean isInitialized() throws InterruptedException { - long currTime = System.currentTimeMillis(); + return initializationLatch.await(1, TimeUnit.MILLISECONDS); + } - Object notSuccessfullySyncedFor; - if (lastSuccessfulSyncTime == 0) { - notSuccessfullySyncedFor = "Never Successfully Synced"; - } else { - notSuccessfullySyncedFor = (currTime - lastSuccessfulSyncTime) / 1000; - } + /** + * Returns debugging information for printing, must not be used for any other purpose. + */ + public Map getDebugInfo() + { return ImmutableMap.of( - "notSyncedForSecs", lastSyncTime == 0 ? "Never Synced" : (currTime - lastSyncTime) / 1000, - "notSuccessfullySyncedFor", notSuccessfullySyncedFor, + "millisSinceLastRequest", sinceLastSyncRequest.millisElapsed(), + "millisSinceLastSuccess", sinceLastSyncSuccess.millisElapsed(), "consecutiveFailedAttemptCount", consecutiveFailedAttemptCount, "syncScheduled", startStopLock.isStarted() ); } /** - * Exposed for monitoring use to see if sync is working fine and not stopped due to any coding bugs. If this - * ever returns false then caller of this method must create an alert and it should be looked into for any - * bugs. + * Whether this syncer should be reset. This method returning true typically + * indicates a problem with the sync scheduler. + * + * @return true if the delay since the last request to the server (or since + * syncer start in case of no request to the server) has exceeded + * {@link #maxDelayBetweenSyncRequests}. */ - public boolean isOK() + public boolean needsReset() { - return (System.currentTimeMillis() - lastSyncTime) < MAX_RETRY_BACKOFF + 3 * serverHttpTimeout; + if (sinceLastSyncRequest.isRunning()) { + return sinceLastSyncRequest.hasElapsed(maxDelayBetweenSyncRequests); + } else { + return sinceSyncerStart.hasElapsed(maxDelayBetweenSyncRequests); + } } - public long getServerHttpTimeout() + public long getUnstableTimeMillis() { - return serverHttpTimeout; + return consecutiveFailedAttemptCount <= 0 ? 0 : sinceUnstable.millisElapsed(); + } + + /** + * @return true if there have been no sync failures recently and the last + * successful sync was not more than {@link #maxDurationToWaitForSync} ago. + */ + public boolean isSyncedSuccessfully() + { + if (consecutiveFailedAttemptCount > 0) { + return false; + } else { + return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync); + } } private void sync() { if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - log.info("Skipping sync() call for server[%s].", logIdentity); + log.info("Skipping sync for server[%s] as syncer has not started yet.", logIdentity); return; } - lastSyncTime = System.currentTimeMillis(); + sinceLastSyncRequest.restart(); try { final String req = getRequestString(); @@ -234,28 +262,27 @@ public void onSuccess(InputStream stream) { synchronized (startStopLock) { if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - log.info("Skipping sync() success for server[%s].", logIdentity); + log.info("Not handling response for server[%s] as syncer has not started yet.", logIdentity); return; } try { - if (responseHandler.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + final int responseCode = responseHandler.getStatus(); + if (responseCode == HttpServletResponse.SC_NO_CONTENT) { log.debug("Received NO CONTENT from server[%s]", logIdentity); - lastSuccessfulSyncTime = System.currentTimeMillis(); + sinceLastSyncSuccess.restart(); return; - } else if (responseHandler.getStatus() != HttpServletResponse.SC_OK) { - handleFailure(new RE("Bad Sync Response.")); + } else if (responseCode != HttpServletResponse.SC_OK) { + handleFailure(new ISE("Received sync response [%d]", responseCode)); return; } - log.debug("Received sync response from [%s]", logIdentity); - + log.debug("Received sync response from server[%s]", logIdentity); ChangeRequestsSnapshot changes = smileMapper.readValue(stream, responseTypeReferences); - - log.debug("Finished reading sync response from [%s]", logIdentity); + log.debug("Finished reading sync response from server[%s]", logIdentity); if (changes.isResetCounter()) { - log.info("[%s] requested resetCounter for reason [%s].", logIdentity, changes.getResetCause()); + log.info("Server[%s] requested resetCounter for reason[%s].", logIdentity, changes.getResetCause()); counter = null; return; } @@ -270,29 +297,19 @@ public void onSuccess(InputStream stream) if (initializationLatch.getCount() > 0) { initializationLatch.countDown(); - log.info("[%s] synced successfully for the first time.", logIdentity); + log.info("Server[%s] synced successfully for the first time.", logIdentity); } if (consecutiveFailedAttemptCount > 0) { consecutiveFailedAttemptCount = 0; - log.info("[%s] synced successfully.", logIdentity); + sinceUnstable.reset(); + log.info("Server[%s] synced successfully.", logIdentity); } - lastSuccessfulSyncTime = System.currentTimeMillis(); + sinceLastSyncSuccess.restart(); } catch (Exception ex) { - String logMsg = StringUtils.nonStrictFormat( - "Error processing sync response from [%s]. Reason [%s]", - logIdentity, - ex.getMessage() - ); - - if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { - log.error(ex, logMsg); - } else { - log.info("Temporary Failure. %s", logMsg); - log.debug(ex, logMsg); - } + markServerUnstableAndAlert(ex, "Processing Response"); } finally { addNextSyncToWorkQueue(); @@ -305,7 +322,7 @@ public void onFailure(Throwable t) { synchronized (startStopLock) { if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - log.info("Skipping sync() failure for URL[%s].", logIdentity); + log.info("Not handling sync failure for server[%s] as syncer has not started yet.", logIdentity); return; } @@ -320,19 +337,12 @@ public void onFailure(Throwable t) private void handleFailure(Throwable t) { - String logMsg = StringUtils.nonStrictFormat( - "failed to get sync response from [%s]. Return code [%s], Reason: [%s]", - logIdentity, + String logMsg = StringUtils.format( + "Handling response with code[%d], description[%s]", responseHandler.getStatus(), responseHandler.getDescription() ); - - if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { - log.error(t, logMsg); - } else { - log.info("Temporary Failure. %s", logMsg); - log.debug(t, logMsg); - } + markServerUnstableAndAlert(t, logMsg); } }, executor @@ -340,16 +350,7 @@ private void handleFailure(Throwable t) } catch (Throwable th) { try { - String logMsg = StringUtils.nonStrictFormat( - "Fatal error while fetching segment list from [%s].", logIdentity - ); - - if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { - log.makeAlert(th, logMsg).emit(); - } else { - log.info("Temporary Failure. %s", logMsg); - log.debug(th, logMsg); - } + markServerUnstableAndAlert(th, "Sending Request"); } finally { addNextSyncToWorkQueue(); @@ -384,47 +385,55 @@ private void addNextSyncToWorkQueue() try { if (consecutiveFailedAttemptCount > 0) { - long sleepMillis = Math.min( + long delayMillis = Math.min( MAX_RETRY_BACKOFF, RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount) ); - log.info("Scheduling next syncup in [%d] millis for server[%s].", sleepMillis, logIdentity); - executor.schedule(this::sync, sleepMillis, TimeUnit.MILLISECONDS); + log.info("Scheduling next sync for server[%s] in [%d] millis.", logIdentity, delayMillis); + executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS); } else { executor.execute(this::sync); } } catch (Throwable th) { if (executor.isShutdown()) { + log.warn(th, "Could not schedule sync for server[%s] because executor is stopped.", logIdentity); + } else { log.warn( th, - "Couldn't schedule next sync. [%s] is not being synced any more, probably because executor is stopped.", + "Could not schedule sync for server [%s]. This syncer will be reset automatically." + + " If the issue persists, try restarting this Druid service.", logIdentity ); - } else { - log.makeAlert( - th, - "Couldn't schedule next sync. [%s] is not being synced any more, restarting Druid process on that " - + "server might fix the issue.", - logIdentity - ).emit(); } } } } - private boolean incrementFailedAttemptAndCheckUnstabilityTimeout() + private void markServerUnstableAndAlert(Throwable throwable, String action) { - if (consecutiveFailedAttemptCount > 0 - && (System.currentTimeMillis() - unstableStartTime) > serverUnstabilityTimeout) { - return true; - } - if (consecutiveFailedAttemptCount++ == 0) { - unstableStartTime = System.currentTimeMillis(); + sinceUnstable.restart(); } - return false; + final long unstableSeconds = getUnstableTimeMillis() / 1000; + final String message = StringUtils.format( + "Sync failed for server[%s] while [%s]. Failed [%d] times in the last [%d] seconds.", + baseServerURL, action, consecutiveFailedAttemptCount, unstableSeconds + ); + + // Alert if unstable alert timeout has been exceeded + if (sinceUnstable.hasElapsed(maxUnstableDuration)) { + String alertMessage = StringUtils.format( + "%s. Try restarting the Druid process on server[%s].", + message, baseServerURL + ); + log.noStackTrace().makeAlert(throwable, alertMessage).emit(); + } else if (log.isDebugEnabled()) { + log.debug(throwable, message); + } else { + log.noStackTrace().info(throwable, message); + } } @VisibleForTesting diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index f69f0a84226a7..70ec2b2fa4af0 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -19,197 +19,439 @@ package org.apache.druid.client; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.RE; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; +import org.apache.druid.server.coordination.DataSegmentChangeRequest; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.codec.http.HttpVersion; -import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; -import java.io.ByteArrayInputStream; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; -/** - * - */ public class HttpServerInventoryViewTest { + private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); + private static final TypeReference> + TYPE_REF = HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF; - @BeforeClass - public static void setup() - { - EmittingLogger.registerEmitter(new NoopServiceEmitter()); - } + private static final String EXEC_NAME_PREFIX = "InventoryViewTest"; - @Test(timeout = 60_000L) - public void testSimple() throws Exception + private static final String METRIC_SUCCESS = "segment/serverview/sync/healthy"; + private static final String METRIC_UNSTABLE_TIME = "segment/serverview/sync/unstableTime"; + + private StubServiceEmitter serviceEmitter; + + private HttpServerInventoryView httpServerInventoryView; + private TestChangeRequestHttpClient> httpClient; + private TestExecutorFactory execHelper; + + private TestDruidNodeDiscovery druidNodeDiscovery; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + + private Map> segmentsAddedToView; + private Map> segmentsRemovedFromView; + private Set removedServers; + + private AtomicBoolean inventoryInitialized; + + @Before + public void setup() { - ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + serviceEmitter = new StubServiceEmitter("test", "localhost"); + EmittingLogger.registerEmitter(serviceEmitter); - TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + druidNodeDiscovery = new TestDruidNodeDiscovery(); + druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); - final DataSegment segment1 = new DataSegment( - "test1", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpClient = new TestChangeRequestHttpClient<>(TYPE_REF, MAPPER); + execHelper = new TestExecutorFactory(); + inventoryInitialized = new AtomicBoolean(false); - final DataSegment segment2 = new DataSegment( - "test2", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + segmentsAddedToView = new HashMap<>(); + segmentsRemovedFromView = new HashMap<>(); + removedServers = new HashSet<>(); - final DataSegment segment3 = new DataSegment( - "test3", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + createInventoryView( + new HttpServerInventoryViewConfig(null, null, null) ); + } - final DataSegment segment4 = new DataSegment( - "test4", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + @After + public void tearDown() + { + EasyMock.verify(druidNodeDiscoveryProvider); + if (httpServerInventoryView != null && httpServerInventoryView.isStarted()) { + httpServerInventoryView.stop(); + } + } - final DataSegment segment5 = new DataSegment( - "non-loading-datasource", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + @Test + public void testInitHappensAfterNodeViewInit() + { + httpServerInventoryView.start(); + Assert.assertTrue(httpServerInventoryView.isStarted()); + Assert.assertFalse(inventoryInitialized.get()); + + druidNodeDiscovery.markNodeViewInitialized(); + Assert.assertFalse(inventoryInitialized.get()); + + execHelper.finishInventoryInitialization(); + Assert.assertTrue(inventoryInitialized.get()); + + httpServerInventoryView.stop(); + } + + @Test + public void testStopShutsDownExecutors() + { + httpServerInventoryView.start(); + Assert.assertFalse(execHelper.syncExecutor.isShutdown()); + + httpServerInventoryView.stop(); + Assert.assertTrue(execHelper.syncExecutor.isShutdown()); + } + + @Test + public void testAddNodeStartsSync() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + final DiscoveryDruidNode druidNode = druidNodeDiscovery + .addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + Collection inventory = httpServerInventoryView.getInventory(); + Assert.assertEquals(1, inventory.size()); + Assert.assertTrue(inventory.contains(server)); + + execHelper.emitMetrics(); + serviceEmitter.verifyValue(METRIC_SUCCESS, 1); + serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME); + + DataSegment segment = CreateDataSegments.ofDatasource("wiki").eachOfSizeInMb(500).get(0); + httpClient.completeNextRequestWith( + snapshotOf(new SegmentChangeRequestLoad(segment)) ); + execHelper.sendSyncRequestAndHandleResponse(); - TestHttpClient httpClient = new TestHttpClient( - ImmutableList.of( - Futures.immediateFuture( - new ByteArrayInputStream( - jsonMapper.writerWithType(HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF).writeValueAsBytes( - new ChangeRequestsSnapshot( - false, - null, - ChangeRequestHistory.Counter.ZERO, - ImmutableList.of( - new SegmentChangeRequestLoad(segment1) - ) - ) - ) - ) - ), - Futures.immediateFuture( - new ByteArrayInputStream( - jsonMapper.writerWithType(HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF).writeValueAsBytes( - new ChangeRequestsSnapshot( - false, - null, - ChangeRequestHistory.Counter.ZERO, - ImmutableList.of( - new SegmentChangeRequestDrop(segment1), - new SegmentChangeRequestLoad(segment2), - new SegmentChangeRequestLoad(segment3) - ) - ) - ) - ) - ), - Futures.immediateFuture( - new ByteArrayInputStream( - jsonMapper.writerWithType(HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF).writeValueAsBytes( - new ChangeRequestsSnapshot( - true, - "force reset counter", - ChangeRequestHistory.Counter.ZERO, - ImmutableList.of() - ) - ) - ) - ), - Futures.immediateFuture( - new ByteArrayInputStream( - jsonMapper.writerWithType(HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF).writeValueAsBytes( - new ChangeRequestsSnapshot( - false, - null, - ChangeRequestHistory.Counter.ZERO, - ImmutableList.of( - new SegmentChangeRequestLoad(segment3), - new SegmentChangeRequestLoad(segment4), - new SegmentChangeRequestLoad(segment5) - ) - ) - ) - ) - ) + DruidServer inventoryValue = httpServerInventoryView.getInventoryValue(server.getName()); + Assert.assertNotNull(inventoryValue); + Assert.assertEquals(1, inventoryValue.getTotalSegments()); + Assert.assertNotNull(inventoryValue.getSegment(segment.getId())); + + httpServerInventoryView.stop(); + } + + @Test + public void testRemoveNodeStopsSync() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + final DiscoveryDruidNode druidNode = druidNodeDiscovery + .addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + druidNodeDiscovery.removeNodesAndNotifyListeners(druidNode); + + Assert.assertNull(httpServerInventoryView.getInventoryValue(server.getName())); + + execHelper.emitMetrics(); + serviceEmitter.verifyNotEmitted(METRIC_SUCCESS); + serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME); + + httpServerInventoryView.stop(); + } + + @Test(timeout = 60_000L) + public void testSyncSegmentLoadAndDrop() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + final DiscoveryDruidNode druidNode = druidNodeDiscovery + .addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + final DataSegment[] segments = + CreateDataSegments.ofDatasource("wiki") + .forIntervals(4, Granularities.DAY) + .eachOfSizeInMb(500) + .toArray(new DataSegment[0]); + + // Request 1: Load S1 + httpClient.completeNextRequestWith( + snapshotOf(new SegmentChangeRequestLoad(segments[0])) + ); + execHelper.sendSyncRequestAndHandleResponse(); + Assert.assertTrue(isAddedToView(server, segments[0])); + + // Request 2: Drop S1, Load S2, S3 + resetForNextSyncRequest(); + httpClient.completeNextRequestWith( + snapshotOf( + new SegmentChangeRequestDrop(segments[0]), + new SegmentChangeRequestLoad(segments[1]), + new SegmentChangeRequestLoad(segments[2]) + ) + ); + execHelper.sendSyncRequestAndHandleResponse(); + Assert.assertTrue(isRemovedFromView(server, segments[0])); + Assert.assertTrue(isAddedToView(server, segments[1])); + Assert.assertTrue(isAddedToView(server, segments[2])); + + // Request 3: reset the counter + resetForNextSyncRequest(); + httpClient.completeNextRequestWith( + new ChangeRequestsSnapshot<>( + true, + "Server requested reset", + ChangeRequestHistory.Counter.ZERO, + Collections.emptyList() + ) + ); + execHelper.sendSyncRequestAndHandleResponse(); + Assert.assertTrue(segmentsAddedToView.isEmpty()); + Assert.assertTrue(segmentsRemovedFromView.isEmpty()); + + // Request 4: Load S3, S4 + resetForNextSyncRequest(); + httpClient.completeNextRequestWith( + snapshotOf( + new SegmentChangeRequestLoad(segments[2]), + new SegmentChangeRequestLoad(segments[3]) ) ); + execHelper.sendSyncRequestAndHandleResponse(); + Assert.assertTrue(isRemovedFromView(server, segments[1])); + Assert.assertTrue(isAddedToView(server, segments[3])); + + DruidServer inventoryValue = httpServerInventoryView.getInventoryValue(server.getName()); + Assert.assertNotNull(inventoryValue); + Assert.assertEquals(2, inventoryValue.getTotalSegments()); + Assert.assertNotNull(inventoryValue.getSegment(segments[2].getId())); + Assert.assertNotNull(inventoryValue.getSegment(segments[3].getId())); + + // Verify node removal + druidNodeDiscovery.removeNodesAndNotifyListeners(druidNode); - DiscoveryDruidNode druidNode = new DiscoveryDruidNode( - new DruidNode("service", "host", false, 8080, null, true, false), - NodeRole.HISTORICAL, - ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + // test removal event with empty services + druidNodeDiscovery.removeNodesAndNotifyListeners( + new DiscoveryDruidNode( + new DruidNode("service", "host", false, 8080, null, true, false), + NodeRole.INDEXER, + Collections.emptyMap() ) ); - HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView( - jsonMapper, - httpClient, - druidNodeDiscoveryProvider, - (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"), - new HttpServerInventoryViewConfig(null, null, null), - "test" + // test removal rogue node (announced a service as a DataNodeService but wasn't a DataNodeService at the key) + druidNodeDiscovery.removeNodesAndNotifyListeners( + new DiscoveryDruidNode( + new DruidNode("service", "host", false, 8080, null, true, false), + NodeRole.INDEXER, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, + new LookupNodeService("lookyloo") + ) + ) ); - CountDownLatch initializeCallback1 = new CountDownLatch(1); + Assert.assertTrue(removedServers.contains(server.getMetadata())); + Assert.assertNull(httpServerInventoryView.getInventoryValue(server.getName())); + + httpServerInventoryView.stop(); + } + + @Test + public void testSyncWhenRequestFailedToSend() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + + httpClient.failToSendNextRequestWith(new ISE("Could not send request to server")); + execHelper.sendSyncRequest(); + + serviceEmitter.flush(); + execHelper.emitMetrics(); + serviceEmitter.verifyValue(METRIC_SUCCESS, 0); + + httpServerInventoryView.stop(); + } + + @Test + public void testSyncWhenErrorResponse() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + + httpClient.completeNextRequestWith(InvalidInput.exception("failure on server")); + execHelper.sendSyncRequestAndHandleResponse(); + + serviceEmitter.flush(); + execHelper.emitMetrics(); + serviceEmitter.verifyValue(METRIC_SUCCESS, 0); + + httpServerInventoryView.stop(); + } - Map segmentAddLathces = ImmutableMap.of( - segment1.getId(), new CountDownLatch(1), - segment2.getId(), new CountDownLatch(1), - segment3.getId(), new CountDownLatch(1), - segment4.getId(), new CountDownLatch(1) + @Test + public void testUnstableServerAlertsAfterTimeout() + { + // Create inventory with alert timeout as 0 ms + createInventoryView( + new HttpServerInventoryViewConfig(null, Period.millis(0), null) ); - Map segmentDropLatches = ImmutableMap.of( - segment1.getId(), new CountDownLatch(1), - segment2.getId(), new CountDownLatch(1) + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + + serviceEmitter.flush(); + httpClient.completeNextRequestWith(InvalidInput.exception("failure on server")); + execHelper.sendSyncRequestAndHandleResponse(); + + List alerts = serviceEmitter.getAlerts(); + Assert.assertEquals(1, alerts.size()); + AlertEvent alert = alerts.get(0); + Assert.assertTrue(alert.getDescription().contains("Sync failed for server")); + + serviceEmitter.flush(); + execHelper.emitMetrics(); + serviceEmitter.verifyValue(METRIC_SUCCESS, 0); + + httpServerInventoryView.stop(); + } + + @Test(timeout = 60_000) + public void testInitWaitsForServerToSync() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + + ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); + + try { + initExecutor.submit(() -> execHelper.finishInventoryInitialization()); + + // Wait to ensure that init thread is in progress and waiting + Thread.sleep(1000); + Assert.assertFalse(inventoryInitialized.get()); + + // Finish sync of server + httpClient.completeNextRequestWith(snapshotOf()); + execHelper.sendSyncRequestAndHandleResponse(); + + // Wait for 10 seconds to ensure that init thread knows about server sync + Thread.sleep(10_000); + Assert.assertTrue(inventoryInitialized.get()); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted"); + } + finally { + initExecutor.shutdownNow(); + } + } + + @Test(timeout = 60_000) + public void testInitDoesNotWaitForRemovedServerToSync() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + DiscoveryDruidNode node = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + + ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); + + try { + initExecutor.submit(() -> execHelper.finishInventoryInitialization()); + + // Wait to ensure that init thread is in progress and waiting + Thread.sleep(1000); + Assert.assertFalse(inventoryInitialized.get()); + + // Remove the node from discovery + druidNodeDiscovery.removeNodesAndNotifyListeners(node); + + // Wait for 10 seconds to ensure that init thread knows about server removal + Thread.sleep(10_000); + Assert.assertTrue(inventoryInitialized.get()); + } + catch (InterruptedException e) { + throw new ISE(e, "Interrupted"); + } + finally { + initExecutor.shutdownNow(); + } + } + + private void createInventoryView(HttpServerInventoryViewConfig config) + { + httpServerInventoryView = new HttpServerInventoryView( + MAPPER, + httpClient, + druidNodeDiscoveryProvider, + pair -> !pair.rhs.getDataSource().equals("non-loading-datasource"), + config, + serviceEmitter, + execHelper, + EXEC_NAME_PREFIX ); httpServerInventoryView.registerSegmentCallback( @@ -219,138 +461,60 @@ DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerT @Override public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { - segmentAddLathces.get(segment.getId()).countDown(); + segmentsAddedToView.computeIfAbsent(server, s -> new HashSet<>()).add(segment); return ServerView.CallbackAction.CONTINUE; } @Override public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { - segmentDropLatches.get(segment.getId()).countDown(); + segmentsRemovedFromView.computeIfAbsent(server, s -> new HashSet<>()).add(segment); return ServerView.CallbackAction.CONTINUE; } @Override public ServerView.CallbackAction segmentViewInitialized() { - initializeCallback1.countDown(); + inventoryInitialized.set(true); return ServerView.CallbackAction.CONTINUE; } } ); - final CountDownLatch serverRemovedCalled = new CountDownLatch(1); httpServerInventoryView.registerServerRemovedCallback( Execs.directExecutor(), - new ServerView.ServerRemovedCallback() - { - @Override - public ServerView.CallbackAction serverRemoved(DruidServer server) - { - if (server.getName().equals("host:8080")) { - serverRemovedCalled.countDown(); - return ServerView.CallbackAction.CONTINUE; - } else { - throw new RE("Unknown server [%s]", server.getName()); - } - } + server -> { + removedServers.add(server.getMetadata()); + return ServerView.CallbackAction.CONTINUE; } ); - - httpServerInventoryView.start(); - - druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode)); - - initializeCallback1.await(); - segmentAddLathces.get(segment1.getId()).await(); - segmentDropLatches.get(segment1.getId()).await(); - segmentAddLathces.get(segment2.getId()).await(); - segmentAddLathces.get(segment3.getId()).await(); - segmentAddLathces.get(segment4.getId()).await(); - segmentDropLatches.get(segment2.getId()).await(); - - DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080"); - Assert.assertEquals( - ImmutableMap.of(segment3.getId(), segment3, segment4.getId(), segment4), - Maps.uniqueIndex(druidServer.iterateAllSegments(), DataSegment::getId) - ); - - druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode)); - - // test removal event with empty services - druidNodeDiscovery.listener.nodesRemoved( - ImmutableList.of( - new DiscoveryDruidNode( - new DruidNode("service", "host", false, 8080, null, true, false), - NodeRole.INDEXER, - Collections.emptyMap() - ) - ) - ); - - // test removal rogue node (announced a service as a DataNodeService but wasn't a DataNodeService at the key) - druidNodeDiscovery.listener.nodesRemoved( - ImmutableList.of( - new DiscoveryDruidNode( - new DruidNode("service", "host", false, 8080, null, true, false), - NodeRole.INDEXER, - ImmutableMap.of( - DataNodeService.DISCOVERY_SERVICE_KEY, - new LookupNodeService("lookyloo") - ) - ) - ) - ); - - serverRemovedCalled.await(); - Assert.assertNull(httpServerInventoryView.getInventoryValue("host:8080")); - - EasyMock.verify(druidNodeDiscoveryProvider); - - httpServerInventoryView.stop(); } - @Test(timeout = 60_000L) - public void testSyncMonitoring() + private boolean isAddedToView(DruidServer server, DataSegment segment) { - ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - - TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); - EasyMock.replay(druidNodeDiscoveryProvider); - - TestHttpClient httpClient = new TestHttpClient(ImmutableList.of()); + return segmentsAddedToView.getOrDefault(server.getMetadata(), Collections.emptySet()) + .contains(segment); + } - HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView( - jsonMapper, - httpClient, - druidNodeDiscoveryProvider, - (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"), - new HttpServerInventoryViewConfig(null, null, null), - "test" - ); + private boolean isRemovedFromView(DruidServer server, DataSegment segment) + { + return segmentsRemovedFromView.getOrDefault(server.getMetadata(), Collections.emptySet()) + .contains(segment); + } - httpServerInventoryView.start(); - httpServerInventoryView.serverAdded(makeServer("abc.com:8080")); - httpServerInventoryView.serverAdded(makeServer("xyz.com:8080")); - httpServerInventoryView.serverAdded(makeServer("lol.com:8080")); - Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size()); - httpServerInventoryView.syncMonitoring(); - Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size()); + private void resetForNextSyncRequest() + { + segmentsAddedToView.clear(); + segmentsRemovedFromView.clear(); } - private DruidServer makeServer(String host) + private static ChangeRequestsSnapshot snapshotOf( + DataSegmentChangeRequest... requests + ) { - return new DruidServer( - host, - host, - host, - 100_000_000L, - ServerType.HISTORICAL, - "__default_tier", - 50 + return ChangeRequestsSnapshot.success( + ChangeRequestHistory.Counter.ZERO, + Arrays.asList(requests) ); } @@ -367,64 +531,97 @@ public Collection getAllNodes() @Override public void registerListener(Listener listener) { - listener.nodesAdded(ImmutableList.of()); - listener.nodeViewInitialized(); this.listener = listener; } - } - private static class TestHttpClient implements HttpClient - { - BlockingQueue results; - AtomicInteger requestNum = new AtomicInteger(0); + /** + * Marks the node view as initialized and notifies the listeners. + */ + void markNodeViewInitialized() + { + listener.nodeViewInitialized(); + } - TestHttpClient(List resultsList) + /** + * Creates and adds a new node and notifies the listeners. + */ + DiscoveryDruidNode addNodeAndNotifyListeners(String host) { - results = new LinkedBlockingQueue<>(); - results.addAll(resultsList); + final DruidNode druidNode = new DruidNode("druid/historical", host, false, 8080, null, true, false); + DataNodeService dataNodeService = new DataNodeService("tier", 10L << 30, ServerType.HISTORICAL, 0); + final DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( + druidNode, + NodeRole.HISTORICAL, + ImmutableMap.of(DataNodeService.DISCOVERY_SERVICE_KEY, dataNodeService) + ); + listener.nodesAdded(ImmutableList.of(discoveryDruidNode)); + + return discoveryDruidNode; } - @Override - public ListenableFuture go( - Request request, - HttpResponseHandler httpResponseHandler - ) + void removeNodesAndNotifyListeners(DiscoveryDruidNode... nodesToRemove) { - throw new UnsupportedOperationException("Not Implemented."); + listener.nodesRemoved(Arrays.asList(nodesToRemove)); } + } + + /** + * Creates and retains a handle on the executors used by the inventory view. + *

+ * There are 4 types of tasks submitted to the two executors. Upon succesful + * completion, each of these tasks add another task to the execution queue. + *

+ * Tasks running on sync executor: + *

    + *
  1. send request to server (adds "handle response" to queue)
  2. + *
  3. handle response and execute callbacks (adds "send request" to queue)
  4. + *
+ *

+ * Tasks running on monitoring executor. + *

    + *
  1. check and reset unhealthy servers (adds self to queue)
  2. + *
  3. emit metrics (adds self to queue)
  4. + *
+ */ + private static class TestExecutorFactory implements ScheduledExecutorFactory + { + private BlockingExecutorService syncExecutor; + private BlockingExecutorService monitorExecutor; @Override - public ListenableFuture go( - Request request, - HttpResponseHandler httpResponseHandler, - Duration duration - ) + public ScheduledExecutorService create(int corePoolSize, String nameFormat) { - if (requestNum.getAndIncrement() == 0) { - //fail first request immediately - throw new RuntimeException("simulating couldn't send request to server for some reason."); + BlockingExecutorService executorService = new BlockingExecutorService(nameFormat); + final String syncExecutorPrefix = EXEC_NAME_PREFIX + "-%s"; + final String monitorExecutorPrefix = EXEC_NAME_PREFIX + "-monitor-%s"; + if (syncExecutorPrefix.equals(nameFormat)) { + syncExecutor = executorService; + } else if (monitorExecutorPrefix.equals(nameFormat)) { + monitorExecutor = executorService; } - if (requestNum.get() == 2) { - //fail scenario where request is sent to server but we got an unexpected response. - HttpResponse httpResponse = new DefaultHttpResponse( - HttpVersion.HTTP_1_1, - HttpResponseStatus.INTERNAL_SERVER_ERROR - ); - httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse, null); - return Futures.immediateFailedFuture(new RuntimeException("server error")); - } + return new WrappingScheduledExecutorService(nameFormat, executorService, false); + } - HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse, null); - try { - return results.take(); - } - catch (InterruptedException ex) { - throw new RE(ex, "Interrupted."); - } + void sendSyncRequestAndHandleResponse() + { + syncExecutor.finishNextPendingTasks(2); + } + + void sendSyncRequest() + { + syncExecutor.finishNextPendingTask(); + } + + void finishInventoryInitialization() + { + syncExecutor.finishNextPendingTask(); + } + + void emitMetrics() + { + // Finish 1 task for check and reset, 1 for metric emission + monitorExecutor.finishNextPendingTasks(2); } } } diff --git a/server/src/test/java/org/apache/druid/client/TestChangeRequestHttpClient.java b/server/src/test/java/org/apache/druid/client/TestChangeRequestHttpClient.java new file mode 100644 index 0000000000000..344497ee06dd3 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/TestChangeRequestHttpClient.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; + +import java.io.ByteArrayInputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * Test implementation of {@link HttpClient} that can be used to test + * {@link org.apache.druid.server.coordination.ChangeRequestHttpSyncer}. + */ +public class TestChangeRequestHttpClient implements HttpClient +{ + private final ObjectMapper mapper; + private final TypeReference typeReference; + private final BlockingQueue> results = new LinkedBlockingQueue<>(); + + private final AtomicInteger requestCount = new AtomicInteger(0); + + public TestChangeRequestHttpClient(TypeReference typeReference, ObjectMapper mapper) + { + this.mapper = mapper; + this.typeReference = typeReference; + } + + public void failToSendNextRequestWith(RuntimeException error) + { + results.add(new ResultHolder<>(null, error, null)); + } + + public void completeNextRequestWith(DruidException druidException) + { + results.add(new ResultHolder<>(null, null, druidException)); + } + + public void completeNextRequestWith(R result) + { + results.add(new ResultHolder<>(() -> result, null, null)); + } + + public boolean hasPendingResults() + { + return !results.isEmpty(); + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler httpResponseHandler + ) + { + throw new UnsupportedOperationException("Not Implemented."); + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler httpResponseHandler, + Duration duration + ) + { + final int currentRequest = requestCount.getAndIncrement(); + + final ResultHolder nextResult = results.poll(); + if (nextResult == null) { + throw new ISE("No known response for request [%d]", currentRequest); + } else if (nextResult.clientError != null) { + throw nextResult.clientError; + } else if (nextResult.serverError != null) { + HttpResponse errorResponse = buildErrorResponse(nextResult.serverError); + httpResponseHandler.handleResponse(errorResponse, null); + return (ListenableFuture) Futures.immediateFuture(new ByteArrayInputStream(new byte[0])); + } else { + HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + httpResponse.setContent(ChannelBuffers.buffer(0)); + httpResponseHandler.handleResponse(httpResponse, null); + } + + try { + ByteArrayInputStream resultBytes = new ByteArrayInputStream( + mapper.writerFor(typeReference).writeValueAsBytes(nextResult.supplier.get()) + ); + return (ListenableFuture) Futures.immediateFuture(resultBytes); + } + catch (Exception e) { + throw new RE(e, "Error while sending HTTP response: %s", e.getMessage()); + } + } + + private HttpResponse buildErrorResponse(DruidException druidException) + { + HttpResponse httpResponse = new DefaultHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.valueOf(druidException.getStatusCode()) + ); + httpResponse.setContent(ChannelBuffers.buffer(0)); + + ErrorResponse errorResponse = druidException.toErrorResponse(); + try { + httpResponse.setContent(ChannelBuffers.copiedBuffer(mapper.writeValueAsBytes(errorResponse))); + return httpResponse; + } + catch (JsonProcessingException e) { + throw new ISE("Error while serializing given response"); + } + } + + private static class ResultHolder + { + final Supplier supplier; + final RuntimeException clientError; + final DruidException serverError; + + ResultHolder( + Supplier supplier, + RuntimeException clientError, + DruidException serverError + ) + { + this.supplier = supplier; + this.clientError = clientError; + this.serverError = serverError; + } + } +}