Skip to content

Commit

Permalink
Register endpoints by plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
dbw9580 committed Aug 31, 2020
1 parent 29fe525 commit 85e7d2a
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void init() {
for (Multihash leaf : leafAddrMap.keySet()) {
String peerHostname = leafAddrMap.get(leaf);

Optional<DrillbitEndpoint> oep = coordinator.getAvailableEndpoints()
Optional<DrillbitEndpoint> oep = coordinator.getEndpointsByPlugin(IPFSStoragePlugin.class)
.stream()
.filter(a -> a.getAddress().equals(peerHostname))
.findAny();
Expand All @@ -143,7 +143,7 @@ private void init() {
.setState(DrillbitEndpoint.State.ONLINE)
.build();
//DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
ClusterCoordinator.RegistrationHandle handle = coordinator.registerByPlugin(ep, IPFSStoragePlugin.class);
}

IPFSWork work = new IPFSWork(leaf);
Expand Down Expand Up @@ -225,7 +225,8 @@ public int getMaxParallelizationWidth() {
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
logger.debug("Applying assignments: endpointWorksMap = {}", endpointWorksMap);
assignments = AssignmentCreator.getMappings(incomingEndpoints, ipfsWorkList);
List<DrillbitEndpoint> endpoints = getStoragePlugin().getContext().getClusterCoordinator().getEndpointsByPlugin(IPFSStoragePlugin.class).stream().collect(Collectors.toList());
assignments = AssignmentCreator.getMappings(endpoints, ipfsWorkList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.drill.exec.coord;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
* as well as understand other node's existence and capabilities.
Expand All @@ -49,8 +50,12 @@ public abstract class ClusterCoordinator implements AutoCloseable {

public abstract RegistrationHandle register(DrillbitEndpoint data);

public abstract RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class<? extends StoragePlugin> ownerClass);

public abstract void unregister(RegistrationHandle handle);

public abstract void unregisterByPlugin(RegistrationHandle handle, Class<? extends StoragePlugin> ownerClass);

/**
* Get a collection of available Drillbit endpoints, Thread-safe.
* Could be slightly out of date depending on refresh policy.
Expand All @@ -69,6 +74,8 @@ public abstract class ClusterCoordinator implements AutoCloseable {

public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();

public abstract Collection<DrillbitEndpoint> getEndpointsByPlugin(Class<? extends StoragePlugin> ownerClass);

public abstract RegistrationHandle update(RegistrationHandle handle, State state);

public interface RegistrationHandle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@
*/
package org.apache.drill.exec.coord.local;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
Expand All @@ -34,9 +25,19 @@
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;

import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class LocalClusterCoordinator extends ClusterCoordinator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);

Expand All @@ -46,6 +47,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
* ConcurrentModificationException.
*/
private final Map<RegistrationHandle, DrillbitEndpoint> endpoints = new ConcurrentHashMap<>();
private final Map<Class<? extends StoragePlugin>, Map<RegistrationHandle, DrillbitEndpoint>> endpointsByPlugin = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();

private final TransientStoreFactory factory = CachingTransientStoreFactory.of(new TransientStoreFactory() {
Expand Down Expand Up @@ -78,6 +80,18 @@ public RegistrationHandle register( DrillbitEndpoint data) {
return h;
}

@Override
public RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class<? extends StoragePlugin> ownerClass) {
logger.debug("Endpoint registered for storage plugin {}: {}.", ownerClass.getName(), data);
final Handle h = new Handle(data);
data = data.toBuilder().setState(State.ONLINE).build();
if (!endpointsByPlugin.containsKey(ownerClass)) {
endpointsByPlugin.put(ownerClass, new ConcurrentHashMap<>());
}
endpointsByPlugin.get(ownerClass).put(h, data);
return h;
}

@Override
public void unregister(final RegistrationHandle handle) {
if (handle == null) {
Expand All @@ -87,6 +101,19 @@ public void unregister(final RegistrationHandle handle) {
endpoints.remove(handle);
}

@Override
public void unregisterByPlugin(RegistrationHandle handle, Class<? extends StoragePlugin> ownerClass) {
if (handle == null) {
return;
}

if (!endpointsByPlugin.containsKey(ownerClass)) {
return;
}

endpointsByPlugin.get(ownerClass).remove(handle);
}

/**
* Update drillbit endpoint state. Drillbit advertises its
* state. State information is used during planning and initial
Expand Down Expand Up @@ -124,6 +151,14 @@ public Collection<DrillbitEndpoint> getOnlineEndPoints() {
return runningEndPoints;
}

@Override
public Collection<DrillbitEndpoint> getEndpointsByPlugin(Class<? extends StoragePlugin> ownerClass) {
if (!endpointsByPlugin.containsKey(ownerClass)) {
return Collections.emptyList();
}
return endpointsByPlugin.get(ownerClass).values();
}

private class Handle implements RegistrationHandle {
private final UUID id = UUID.randomUUID();
private DrillbitEndpoint drillbitEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,12 @@
*/
package org.apache.drill.exec.coord.zk;

import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
Expand All @@ -57,7 +43,25 @@
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform;

/**
* Manages cluster coordination utilizing zookeeper. *
Expand All @@ -76,6 +80,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {

// endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
private final ConcurrentHashMap<Class<? extends StoragePlugin>, Map<RegistrationHandle, DrillbitEndpoint>> endpointsByPlugin = new ConcurrentHashMap<>();
private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");

public ZKClusterCoordinator(DrillConfig config, String connect) {
Expand Down Expand Up @@ -189,6 +194,17 @@ public RegistrationHandle register(DrillbitEndpoint data) {
}
}

@Override
public RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class<? extends StoragePlugin> ownerClass) {
final ZKRegistrationHandle h = new ZKRegistrationHandle(UUID.randomUUID().toString(), data);
data = data.toBuilder().setState(State.ONLINE).build();
if (!endpointsByPlugin.containsKey(ownerClass)) {
endpointsByPlugin.put(ownerClass, new ConcurrentHashMap<>());
}
endpointsByPlugin.get(ownerClass).put(h, data);
return h;
}

@Override
public void unregister(RegistrationHandle handle) {
if (!(handle instanceof ZKRegistrationHandle)) {
Expand All @@ -213,6 +229,19 @@ public void unregister(RegistrationHandle handle) {
}
}

@Override
public void unregisterByPlugin(RegistrationHandle handle, Class<? extends StoragePlugin> ownerClass) {
if (!(handle instanceof ZKRegistrationHandle)) {
throw new UnsupportedOperationException("Unknown handle type: " + handle.getClass().getName());
}

ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
if (!endpointsByPlugin.containsKey(ownerClass)) {
return;
}
endpointsByPlugin.get(ownerClass).remove(h);
}

/**
* Update drillbit endpoint state. Drillbit advertises its
* state in Zookeeper when a shutdown request of drillbit is
Expand Down Expand Up @@ -259,6 +288,14 @@ public Collection<DrillbitEndpoint> getOnlineEndPoints() {
return runningEndPoints;
}

@Override
public Collection<DrillbitEndpoint> getEndpointsByPlugin(Class<? extends StoragePlugin> ownerClass) {
if (!endpointsByPlugin.containsKey(ownerClass)) {
return Collections.emptyList();
}
return endpointsByPlugin.get(ownerClass).values();
}

@Override
public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases);
Expand Down

0 comments on commit 85e7d2a

Please sign in to comment.