Skip to content

Commit

Permalink
Fix bug during initialization of HttpServerInventoryView (apache#14517)
Browse files Browse the repository at this point in the history
If a server is removed during `HttpServerInventoryView.serverInventoryInitialized`,
the initialization gets stuck as this server is never synced. The method eventually times
out (default 250s).

Fix: Mark a server as stopped if it is removed. `serverInventoryInitialized` only waits for
non-stopped servers to sync.

Other changes:
- Add new metrics for better debugging of slow broker/coordinator startup
  - `segment/serverview/sync/healthy`: whether the server view is syncing properly with a server
  - `segment/serverview/sync/unstableTime`: time for which sync with a server has been unstable  
- Clean up logging in `HttpServerInventoryView` and `ChangeRequestHttpSyncer`
- Minor refactor for readability
- Add utility class `Stopwatch`
- Add tests and stubs
  • Loading branch information
kfaraz authored and sergioferragut committed Jul 21, 2023
1 parent b07fb2f commit 29cc732
Show file tree
Hide file tree
Showing 16 changed files with 1,116 additions and 476 deletions.
5 changes: 4 additions & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ Metrics may have additional dimensions beyond those listed above.
|`init/metadatacache/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.|
|`segment/metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`|
|`segment/metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`|

|`segment/serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `segment/serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`segment/serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|

### Historical

Expand Down Expand Up @@ -324,6 +325,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`metadata/kill/rule/count`|Total number of rules that were automatically deleted from metadata store per each Coordinator kill rule duty run. This metric can help adjust `druid.coordinator.kill.rule.durationToRetain` configuration based on whether more or less rules need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is set to true.| |Varies|
|`metadata/kill/datasource/count`|Total number of datasource metadata that were automatically deleted from metadata store per each Coordinator kill datasource duty run (Note: datasource metadata only exists for datasource created from supervisor). This metric can help adjust `druid.coordinator.kill.datasource.durationToRetain` configuration based on whether more or less datasource metadata need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.datasource.on` is set to true.| |Varies|
|`init/serverview/time`|Time taken to initialize the coordinator server view.||Depends on the number of segments|
|`segment/serverview/sync/healthy`|Sync status of the Coordinator with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `segment/serverview/sync/unstableTime` to debug slow startup of the Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`segment/serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|

## General Health

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ void syncMonitoring()
final Set<Map.Entry<String, WorkerHolder>> workerEntrySet = ImmutableSet.copyOf(workers.entrySet());
for (Map.Entry<String, WorkerHolder> e : workerEntrySet) {
WorkerHolder workerHolder = e.getValue();
if (!workerHolder.getUnderlyingSyncer().isOK()) {
if (workerHolder.getUnderlyingSyncer().needsReset()) {
synchronized (workers) {
// check again that server is still there and only then reset.
if (workers.containsKey(e.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -340,15 +339,15 @@ public void stop()

public void waitForInitialization() throws InterruptedException
{
if (!syncer.awaitInitialization(3 * syncer.getServerHttpTimeout(), TimeUnit.MILLISECONDS)) {
if (!syncer.awaitInitialization()) {
throw new RE("Failed to sync with worker[%s].", worker.getHost());
}
}

public boolean isInitialized()
{
try {
return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS);
return syncer.isInitialized();
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1789,7 +1789,7 @@ private Worker createWorker(String host)
private WorkerHolder createNonSyncingWorkerHolder(Worker worker)
{
ChangeRequestHttpSyncer syncer = EasyMock.createMock(ChangeRequestHttpSyncer.class);
EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes();
EasyMock.expect(syncer.needsReset()).andReturn(true).anyTimes();
EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes();
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void testSyncListener()
ChangeRequestHttpSyncer.Listener<WorkerHistoryItem> syncListener = workerHolder.createSyncListener();

Assert.assertTrue(workerHolder.disabled.get());
Assert.assertFalse(workerHolder.isInitialized());

syncListener.fullSync(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common;

import com.google.common.base.Ticker;
import org.joda.time.Duration;

import java.util.concurrent.TimeUnit;

/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
*/
public class Stopwatch
{
private final com.google.common.base.Stopwatch delegate;

public static Stopwatch createStarted()
{
return new Stopwatch(com.google.common.base.Stopwatch.createStarted());
}

public static Stopwatch createUnstarted()
{
return new Stopwatch(com.google.common.base.Stopwatch.createUnstarted());
}

public static Stopwatch createStarted(Ticker ticker)
{
return new Stopwatch(com.google.common.base.Stopwatch.createStarted(ticker));
}

private Stopwatch(com.google.common.base.Stopwatch delegate)
{
this.delegate = delegate;
}

public synchronized void start()
{
delegate.start();
}

public synchronized void stop()
{
delegate.stop();
}

public synchronized void reset()
{
delegate.reset();
}

/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
{
delegate.reset().start();
}

public synchronized boolean isRunning()
{
return delegate.isRunning();
}

/**
* Returns the milliseconds elapsed on the stopwatch.
*/
public long millisElapsed()
{
return delegate.elapsed(TimeUnit.MILLISECONDS);
}

/**
* Checks if the given duration has already elapsed on the stopwatch.
*/
public boolean hasElapsed(Duration duration)
{
return millisElapsed() >= duration.getMillis();
}

/**
* Checks that the given duration has not elapsed on the stopwatch. Calling this
* method is the same as {@code !stopwatch.hasElapsed(duration)}.
*/
public boolean hasNotElapsed(Duration duration)
{
return !hasElapsed(duration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common;

import com.google.common.testing.FakeTicker;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class StopwatchTest
{

@Test
public void testDuplicateStartThrowsException()
{
Stopwatch stopwatch = Stopwatch.createStarted();
Assert.assertThrows(IllegalStateException.class, stopwatch::start);
}

@Test
public void testDuplicateStopThrowsException()
{
Stopwatch stopwatch = Stopwatch.createUnstarted();
Assert.assertThrows(IllegalStateException.class, stopwatch::stop);
}

@Test
public void testMillisElapsed()
{
FakeTicker fakeTicker = new FakeTicker();
Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker);
fakeTicker.advance(100, TimeUnit.MILLISECONDS);
stopwatch.stop();

Assert.assertEquals(100, stopwatch.millisElapsed());
}

@Test
public void testHasElapsed()
{
FakeTicker fakeTicker = new FakeTicker();
Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker);
fakeTicker.advance(100, TimeUnit.MILLISECONDS);
stopwatch.stop();

Assert.assertTrue(stopwatch.hasElapsed(Duration.millis(50)));
Assert.assertTrue(stopwatch.hasElapsed(Duration.millis(100)));
Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(101)));
Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(500)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.java.util.metrics;

import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

Expand All @@ -32,6 +33,7 @@
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final Map<String, List<ServiceMetricEvent>> metricEvents = new HashMap<>();

public StubServiceEmitter(String service, String host)
Expand All @@ -46,6 +48,8 @@ public void emit(Event event)
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
.add(metricEvent);
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
}
events.add(event);
}
Expand All @@ -58,6 +62,11 @@ public List<Event> getEvents()
return events;
}

public List<AlertEvent> getAlerts()
{
return alertEvents;
}

@Override
public List<Number> getMetricValues(
String metricName,
Expand Down Expand Up @@ -92,6 +101,7 @@ public void start()
public void flush()
{
events.clear();
alertEvents.clear();
metricEvents.clear();
}

Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,17 @@ public ImmutableDruidServer toImmutableDruidServer()
immutableDataSources.values().stream().mapToInt(dataSource -> dataSource.getSegments().size()).sum();
return new ImmutableDruidServer(metadata, size, immutableDataSources, totalSegments);
}

public DruidServer copyWithoutSegments()
{
return new DruidServer(
getName(),
getHostAndPort(),
getHostAndTlsPort(),
getMaxSize(),
getType(),
getTier(),
getPriority()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;

import javax.validation.constraints.NotNull;
Expand All @@ -51,6 +53,14 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
@NotNull
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;

@JacksonInject
@NotNull
private ScheduledExecutorFactory executorFactory;

@JacksonInject
@NotNull
private ServiceEmitter serviceEmitter;

@Override
public HttpServerInventoryView get()
{
Expand All @@ -60,6 +70,8 @@ public HttpServerInventoryView get()
druidNodeDiscoveryProvider,
Predicates.alwaysFalse(),
config,
serviceEmitter,
executorFactory,
"FilteredHttpServerInventoryView"
);
}
Expand Down
Loading

0 comments on commit 29cc732

Please sign in to comment.