From 85e7d2aec4aba370d12bd052eeb9f5760578886a Mon Sep 17 00:00:00 2001 From: Bowen Ding Date: Mon, 31 Aug 2020 17:07:56 +0800 Subject: [PATCH] Register endpoints by plugin. --- .../drill/exec/store/ipfs/IPFSGroupScan.java | 7 +- .../drill/exec/coord/ClusterCoordinator.java | 15 +++-- .../coord/local/LocalClusterCoordinator.java | 55 ++++++++++++--- .../exec/coord/zk/ZKClusterCoordinator.java | 67 ++++++++++++++----- 4 files changed, 112 insertions(+), 32 deletions(-) diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java index f68749d85b9..3506caee972 100644 --- a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java @@ -122,7 +122,7 @@ private void init() { for (Multihash leaf : leafAddrMap.keySet()) { String peerHostname = leafAddrMap.get(leaf); - Optional oep = coordinator.getAvailableEndpoints() + Optional oep = coordinator.getEndpointsByPlugin(IPFSStoragePlugin.class) .stream() .filter(a -> a.getAddress().equals(peerHostname)) .findAny(); @@ -143,7 +143,7 @@ private void init() { .setState(DrillbitEndpoint.State.ONLINE) .build(); //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? - ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + ClusterCoordinator.RegistrationHandle handle = coordinator.registerByPlugin(ep, IPFSStoragePlugin.class); } IPFSWork work = new IPFSWork(leaf); @@ -225,7 +225,8 @@ public int getMaxParallelizationWidth() { @Override public void applyAssignments(List incomingEndpoints) { logger.debug("Applying assignments: endpointWorksMap = {}", endpointWorksMap); - assignments = AssignmentCreator.getMappings(incomingEndpoints, ipfsWorkList); + List endpoints = getStoragePlugin().getContext().getClusterCoordinator().getEndpointsByPlugin(IPFSStoragePlugin.class).stream().collect(Collectors.toList()); + assignments = AssignmentCreator.getMappings(endpoints, ipfsWorkList); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java index c9bc3b4f4fe..eef4ee63f5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java @@ -17,16 +17,17 @@ */ package org.apache.drill.exec.coord; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; +import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.exec.work.foreman.DrillbitStatusListener; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities * as well as understand other node's existence and capabilities. @@ -49,8 +50,12 @@ public abstract class ClusterCoordinator implements AutoCloseable { public abstract RegistrationHandle register(DrillbitEndpoint data); + public abstract RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class ownerClass); + public abstract void unregister(RegistrationHandle handle); + public abstract void unregisterByPlugin(RegistrationHandle handle, Class ownerClass); + /** * Get a collection of available Drillbit endpoints, Thread-safe. * Could be slightly out of date depending on refresh policy. @@ -69,6 +74,8 @@ public abstract class ClusterCoordinator implements AutoCloseable { public abstract Collection getOnlineEndPoints(); + public abstract Collection getEndpointsByPlugin(Class ownerClass); + public abstract RegistrationHandle update(RegistrationHandle handle, State state); public interface RegistrationHandle { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java index 16a7e9d35cb..56aa4b7f5ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java @@ -17,15 +17,6 @@ */ package org.apache.drill.exec.coord.local; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.DistributedSemaphore; import org.apache.drill.exec.coord.store.CachingTransientStoreFactory; @@ -34,9 +25,19 @@ import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; - +import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + public class LocalClusterCoordinator extends ClusterCoordinator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class); @@ -46,6 +47,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator { * ConcurrentModificationException. */ private final Map endpoints = new ConcurrentHashMap<>(); + private final Map, Map> endpointsByPlugin = new ConcurrentHashMap<>(); private final ConcurrentMap semaphores = Maps.newConcurrentMap(); private final TransientStoreFactory factory = CachingTransientStoreFactory.of(new TransientStoreFactory() { @@ -78,6 +80,18 @@ public RegistrationHandle register( DrillbitEndpoint data) { return h; } + @Override + public RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class ownerClass) { + logger.debug("Endpoint registered for storage plugin {}: {}.", ownerClass.getName(), data); + final Handle h = new Handle(data); + data = data.toBuilder().setState(State.ONLINE).build(); + if (!endpointsByPlugin.containsKey(ownerClass)) { + endpointsByPlugin.put(ownerClass, new ConcurrentHashMap<>()); + } + endpointsByPlugin.get(ownerClass).put(h, data); + return h; + } + @Override public void unregister(final RegistrationHandle handle) { if (handle == null) { @@ -87,6 +101,19 @@ public void unregister(final RegistrationHandle handle) { endpoints.remove(handle); } + @Override + public void unregisterByPlugin(RegistrationHandle handle, Class ownerClass) { + if (handle == null) { + return; + } + + if (!endpointsByPlugin.containsKey(ownerClass)) { + return; + } + + endpointsByPlugin.get(ownerClass).remove(handle); + } + /** * Update drillbit endpoint state. Drillbit advertises its * state. State information is used during planning and initial @@ -124,6 +151,14 @@ public Collection getOnlineEndPoints() { return runningEndPoints; } + @Override + public Collection getEndpointsByPlugin(Class ownerClass) { + if (!endpointsByPlugin.containsKey(ownerClass)) { + return Collections.emptyList(); + } + return endpointsByPlugin.get(ownerClass).values(); + } + private class Handle implements RegistrationHandle { private final UUID id = UUID.randomUUID(); private DrillbitEndpoint drillbitEndpoint; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java index aeb0dd0f200..914e790d970 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java @@ -17,26 +17,12 @@ */ package org.apache.drill.exec.coord.zk; -import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.ArrayList; -import java.util.Set; -import java.util.HashSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.curator.framework.imps.DefaultACLProvider; -import org.apache.drill.shaded.guava.com.google.common.base.Throwables; import org.apache.commons.collections.keyvalue.MultiKey; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; @@ -57,7 +43,25 @@ import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; +import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.shaded.guava.com.google.common.base.Function; +import org.apache.drill.shaded.guava.com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform; /** * Manages cluster coordination utilizing zookeeper. * @@ -76,6 +80,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints private ConcurrentHashMap endpointsMap = new ConcurrentHashMap(); + private final ConcurrentHashMap, Map> endpointsByPlugin = new ConcurrentHashMap<>(); private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$"); public ZKClusterCoordinator(DrillConfig config, String connect) { @@ -189,6 +194,17 @@ public RegistrationHandle register(DrillbitEndpoint data) { } } + @Override + public RegistrationHandle registerByPlugin(DrillbitEndpoint data, Class ownerClass) { + final ZKRegistrationHandle h = new ZKRegistrationHandle(UUID.randomUUID().toString(), data); + data = data.toBuilder().setState(State.ONLINE).build(); + if (!endpointsByPlugin.containsKey(ownerClass)) { + endpointsByPlugin.put(ownerClass, new ConcurrentHashMap<>()); + } + endpointsByPlugin.get(ownerClass).put(h, data); + return h; + } + @Override public void unregister(RegistrationHandle handle) { if (!(handle instanceof ZKRegistrationHandle)) { @@ -213,6 +229,19 @@ public void unregister(RegistrationHandle handle) { } } + @Override + public void unregisterByPlugin(RegistrationHandle handle, Class ownerClass) { + if (!(handle instanceof ZKRegistrationHandle)) { + throw new UnsupportedOperationException("Unknown handle type: " + handle.getClass().getName()); + } + + ZKRegistrationHandle h = (ZKRegistrationHandle) handle; + if (!endpointsByPlugin.containsKey(ownerClass)) { + return; + } + endpointsByPlugin.get(ownerClass).remove(h); + } + /** * Update drillbit endpoint state. Drillbit advertises its * state in Zookeeper when a shutdown request of drillbit is @@ -259,6 +288,14 @@ public Collection getOnlineEndPoints() { return runningEndPoints; } + @Override + public Collection getEndpointsByPlugin(Class ownerClass) { + if (!endpointsByPlugin.containsKey(ownerClass)) { + return Collections.emptyList(); + } + return endpointsByPlugin.get(ownerClass).values(); + } + @Override public DistributedSemaphore getSemaphore(String name, int maximumLeases) { return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases);