Skip to content

Commit

Permalink
Coordinator balancer move then drop fix (apache#5528)
Browse files Browse the repository at this point in the history
* apache#5521 part 1

* formatting

* oops

* less magic tests
  • Loading branch information
clintropolis authored and gianm committed May 16, 2018
1 parent 9a51427 commit 8b930ac
Show file tree
Hide file tree
Showing 6 changed files with 659 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -157,14 +156,7 @@ public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
input -> input.segmentViewInitialized()
);
}
}
Expand Down Expand Up @@ -233,15 +225,10 @@ protected void runSegmentCallbacks(
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
);
Expand All @@ -252,14 +239,9 @@ private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
}
);
Expand All @@ -286,14 +268,7 @@ protected void addSingleInventory(
container.addDataSegment(inventory);

runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container.getMetadata(), inventory);
}
}
input -> input.segmentAdded(container.getMetadata(), inventory)
);
}

Expand All @@ -315,26 +290,16 @@ protected void removeSingleInventory(final DruidServer container, String invento
container.removeDataSegment(inventoryKey);

runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container.getMetadata(), segment);
}
}
input -> input.segmentRemoved(container.getMetadata(), segment)
);
}

@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey),
segment.getIdentifier()
);
return curator.checkExists().forPath(toServedSegPath) != null;
DruidServer server = getInventoryValue(serverKey);
return server != null && server.getSegment(segment.getIdentifier()) != null;
}
catch (Exception ex) {
throw Throwables.propagate(ex);
Expand Down
126 changes: 48 additions & 78 deletions server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.server.coordinator;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -96,14 +95,8 @@ public class DruidCoordinator
{
public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
.onResultOf(
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
})
(Function<DataSegment, Interval>) segment -> segment
.getInterval())
.compound(Ordering.<DataSegment>natural())
.reverse();

Expand Down Expand Up @@ -552,7 +545,8 @@ public ScheduledExecutors.Signal call()
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
theRunnable.run();
}
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader)
if (coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
Expand Down Expand Up @@ -672,83 +666,59 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
super(
ImmutableList.of(
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
new DruidCoordinatorHelper()
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
DruidServer input
)
{
return input.segmentReplicatable();
}
}
).transform(
new Function<DruidServer, ImmutableDruidServer>()
{
@Override
public ImmutableDruidServer apply(DruidServer input)
{
return input.toImmutableDruidServer();
}
}
);

if (log.isDebugEnabled()) {
log.debug("Servers");
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
params -> {
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(DruidServer::segmentReplicatable)
.transform(DruidServer::toImmutableDruidServer);

if (log.isDebugEnabled()) {
log.debug("Servers");
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
}

// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
loadQueuePeon.start();
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
loadQueuePeon.start();
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);

loadManagementPeons.put(server.getName(), loadQueuePeon);
}

cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
loadManagementPeons.put(server.getName(), loadQueuePeon);
}

segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
}

// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);

return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(metadataRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}

return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(metadataRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
Expand Down
42 changes: 11 additions & 31 deletions server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
Expand All @@ -50,8 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
public class LoadQueuePeon
{
private static final EmittingLogger log = new EmittingLogger(LoadQueuePeon.class);
Expand Down Expand Up @@ -237,7 +234,7 @@ private void processSegmentChangeRequest()
if (currentlyProcessing == null) {
if (!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
.emit();
}
actionCompleted();
return;
Expand All @@ -249,38 +246,28 @@ private void processSegmentChangeRequest()
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);

processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);

final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
(CuratorWatcher) watchedEvent -> {
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
}
}
).forPath(path);
Expand Down Expand Up @@ -329,14 +316,7 @@ private void actionCompleted()
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
executeCallbacks(callbacks);
}
}
() -> executeCallbacks(callbacks)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package io.druid.server.coordinator.helper;

import com.google.common.collect.Lists;
import com.metamx.common.guava.Comparators;
import com.google.common.collect.Ordering;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.DateTimes;
import io.druid.metadata.MetadataRuleManager;
Expand Down Expand Up @@ -92,7 +92,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
for (DataSegment segment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Comparators.comparable());
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}

Expand Down
Loading

0 comments on commit 8b930ac

Please sign in to comment.