diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 47f2f776289..cc4e557e421 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -363,6 +363,8 @@ public enum Property { "Properties in this category affect the behavior of the manager server.", "2.1.0"), MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), + MANAGER_ASYNC_CLIENTPORT("manager.port.async.client", "8999", PropertyType.PORT, + "The port used for handling client connections on the manager for async services.", "4.0.0"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -1165,7 +1167,11 @@ public enum Property { @Experimental COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( "compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, - "The interval at which to check the tservers for external compactions.", "2.1.0"); + "The interval at which to check the tservers for external compactions.", "2.1.0"), + COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME( + "compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION, + "The maximum amount of time the coordinator will wait for a requested job from the job queue.", + "4.0.0"); private final String key; private final String defaultValue; diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 44c58923c1e..ff7edfd080e 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.rpc; +import static org.apache.accumulo.core.rpc.clients.ThriftClientTypes.COORDINATOR; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.IOException; @@ -26,14 +27,18 @@ import java.nio.channels.ClosedByInterruptException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSaslClientTransport; @@ -44,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; import com.google.common.net.HostAndPort; /** @@ -113,6 +119,21 @@ public static T getClient(ThriftClientTypes type, HostAndPort address, ClientContext context) throws TTransportException { TTransport transport = context.getTransportPool().getTransport(type, address, context.getClientTimeoutInMillis(), context, true); + + // TODO - This is temporary until we support Async multiplexing + // THRIFT-2427 is tracking this issue and the plan is to reopen a new PR + // to add support in the next version. Once we support multiplexing we can + // remove this special case. + // Note: you could have a sync server that doesn't use SSL or SASL by using + // ThriftServerType.THREADPOOL on the server but the client wouldn't know that + // For now this should be good enough as we will be able to remove this anyways + // when we can multiplex async. So this currently will not work if using that mode. + boolean isSync = context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED) + || context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SSL_ENABLED); + if (type == COORDINATOR && !isSync) { + return type.getClientFactory().getClient(protocolFactory.getProtocol(transport)); + } + return createClient(type, transport); } @@ -382,4 +403,32 @@ public static void checkIOExceptionCause(IOException e) { throw new UncheckedIOException(e); } } + + public static AsyncMethodCallback asyncMethodFuture(CompletableFuture future) { + return new AsyncMethodCallback() { + @Override + public void onComplete(T response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }; + } + + public static T getFutureThriftResult(CompletableFuture future) throws TException { + try { + return future.get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + Throwable rootCause = Throwables.getRootCause(e); + if (rootCause instanceof TException) { + throw (TException) rootCause; + } + } + throw new RuntimeException(e); + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index 76728722d25..32a059dc757 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -23,6 +23,9 @@ import java.net.Socket; import java.nio.channels.SelectionKey; +import org.apache.thrift.TAsyncProcessor; +import org.apache.thrift.TProcessor; +import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -99,11 +102,11 @@ public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport) protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) throws TTransportException { - if (processorFactory_.isAsyncProcessor()) { - throw new IllegalStateException("This implementation does not support AsyncProcessors"); - } - return new CustomFrameBuffer(trans, selectionKey, selectThread); + TProcessor processor = processorFactory_.getProcessor(null); + return processor instanceof TAsyncProcessor + ? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread) + : new CustomFrameBuffer(trans, selectionKey, selectThread); } } @@ -118,7 +121,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException { super(trans, selectionKey, selectThread); // Store the clientAddress in the buffer so it can be referenced for logging during read/write - this.clientAddress = getClientAddress(); + this.clientAddress = getClientAddress(trans_, "CustomFrameBuffer"); } @Override @@ -148,33 +151,78 @@ public boolean write() { } return result; } + } + + /** + * Custom wrapper around + * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} to extract the + * client's network location before accepting the request. + */ + private class CustomAsyncFrameBuffer extends AsyncFrameBuffer { + private final String clientAddress; + + public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, + AbstractSelectThread selectThread) throws TTransportException { + super(trans, selectionKey, selectThread); + // Store the clientAddress in the buffer so it can be referenced for logging during read/write + this.clientAddress = getClientAddress(trans_, "CustomAsyncFrameBuffer"); + } + + @Override + public void invoke() { + // On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere + // in the same thread that called invoke() on the buffer + TServerUtils.clientAddress.set(clientAddress); + super.invoke(); + } + + @Override + public boolean read() { + boolean result = super.read(); + if (!result) { + log.trace("CustomAsyncFrameBuffer.read returned false when reading data from client: {}", + clientAddress); + } + return result; + } - /* - * Helper method used to capture the client address inside the CustomFrameBuffer constructor so - * that it can be referenced inside the read/write methods for logging purposes. It previously - * was only set on the ThreadLocal in the invoke() method but that does not work because A) the - * method isn't called until after reading is finished so the value will be null inside of - * read() and B) The other problem is that invoke() is called on a different thread than - * read()/write() so even if the order was correct it would not be available. - * - * Since a new FrameBuffer is created for each request we can use it to capture the client - * address earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read - * data and write a response back to the client and as part of creation of the buffer the - * TNonblockingSocket is stored as a final variable and won't change so we can safely capture - * the clientAddress in the constructor and use it for logging during read/write and then use - * the value inside of invoke() to set the ThreadLocal so the client address will still be - * available on the thread that called invoke(). - */ - private String getClientAddress() { - String clientAddress = null; - if (trans_ instanceof TNonblockingSocket) { - TNonblockingSocket tsock = (TNonblockingSocket) trans_; - Socket sock = tsock.getSocketChannel().socket(); - clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort(); - log.trace("CustomFrameBuffer captured client address: {}", clientAddress); + @Override + public boolean write() { + boolean result = super.write(); + if (!result) { + log.trace("CustomAsyncFrameBuffer.write returned false when writing data to client: {}", + clientAddress); } - return clientAddress; + return result; + } + } + + /* + * Helper method used to capture the client address inside the CustomFrameBuffer and + * CustomAsyncFrameBuffer constructors so that it can be referenced inside the read/write methods + * for logging purposes. It previously was only set on the ThreadLocal in the invoke() method but + * that does not work because A) the method isn't called until after reading is finished so the + * value will be null inside of read() and B) The other problem is that invoke() is called on a + * different thread than read()/write() so even if the order was correct it would not be + * available. + * + * Since a new FrameBuffer is created for each request we can use it to capture the client address + * earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read data and + * write a response back to the client and as part of creation of the buffer the + * TNonblockingSocket is stored as a final variable and won't change so we can safely capture the + * clientAddress in the constructor and use it for logging during read/write and then use the + * value inside of invoke() to set the ThreadLocal so the client address will still be available + * on the thread that called invoke(). + */ + private static String getClientAddress(TNonblockingTransport transport, String name) { + String clientAddress = null; + if (transport instanceof TNonblockingSocket) { + TNonblockingSocket tsock = (TNonblockingSocket) transport; + Socket sock = tsock.getSocketChannel().socket(); + clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort(); + log.trace("{} captured client address: {}", name, clientAddress); } + return clientAddress; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 9b64aab794a..39668a71f3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -53,8 +53,11 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.rpc.TimedProcessor.AsyncTimedProcessor; +import org.apache.accumulo.server.rpc.TimedProcessor.SyncTimedProcessor; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; @@ -170,7 +173,7 @@ public static ServerAddress startServer(ServerContext context, String hostname, // create the TimedProcessor outside the port search loop so we don't try to // register the same // metrics mbean more than once - TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo()); + TimedProcessor timedProcessor = newTimedProcessor(processor, context.getMetricsInfo()); HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { @@ -574,7 +577,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf, } try { - return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, + return startTServer(serverType, newTimedProcessor(processor, metricsInfo), serverName, threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses); } catch (TTransportException e) { @@ -702,4 +705,10 @@ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProc return new UGIAssumingProcessor(processor); } + + private static TimedProcessor newTimedProcessor(TProcessor processor, MetricsInfo metricsInfo) { + return processor instanceof TAsyncProcessor + ? new AsyncTimedProcessor((TAsyncProcessor) processor, metricsInfo) + : new SyncTimedProcessor(processor, metricsInfo); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java index 6f0a6086eba..9c4df0bc794 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java @@ -18,8 +18,12 @@ */ package org.apache.accumulo.server.rpc; +import java.util.Optional; + import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Iface; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.manager.thrift.FateService; @@ -32,7 +36,6 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; -import org.apache.thrift.TBaseProcessor; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.TServiceClient; @@ -47,8 +50,8 @@ public ThriftProcessorTypes(ThriftClientTypes type) { } @VisibleForTesting - public > TProcessor getTProcessor( - Class

processorClass, Class interfaceClass, H serviceHandler, ServerContext context) { + public P getTProcessor(Class

processorClass, + Class interfaceClass, H serviceHandler, ServerContext context) { I rpcProxy = TraceUtil.wrapService(serviceHandler); if (context.getThriftServerType() == ThriftServerType.SASL) { @SuppressWarnings("unchecked") @@ -114,20 +117,32 @@ public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface servi } public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler, - CompactionCoordinatorService.Iface coordinatorServiceHandler, - ManagerClientService.Iface managerServiceHandler, ServerContext context) { + ManagerClientService.Iface managerServiceHandler, + Optional coordinatorServiceHandler, + ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor( FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context)); - muxProcessor.registerProcessor(COORDINATOR.getServiceName(), - COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, - CompactionCoordinatorService.Iface.class, coordinatorServiceHandler, context)); + coordinatorServiceHandler + .ifPresent(csh -> muxProcessor.registerProcessor(COORDINATOR.getServiceName(), + COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, Iface.class, + csh, context))); muxProcessor.registerProcessor(MANAGER.getServiceName(), MANAGER.getTProcessor(ManagerClientService.Processor.class, ManagerClientService.Iface.class, managerServiceHandler, context)); return muxProcessor; } + public static AsyncProcessor getManagerTAsyncProcessor( + CompactionCoordinatorService.AsyncIface coordinatorServiceHandler, ServerContext context) { + // TODO - Right now this is temporarily returning the single AsyncProcessor + // Once Thrift supports Async multiplexing then we can switch to using + // TMultiplexedAsyncProcessor and use multiplexing like we do for sync processors + // THRIFT-2427 is tracking this issue + return COORDINATOR.getTProcessor(CompactionCoordinatorService.AsyncProcessor.class, + CompactionCoordinatorService.AsyncIface.class, coordinatorServiceHandler, context); + } + public static TMultiplexedProcessor getScanServerTProcessor(ClientServiceHandler clientHandler, TabletScanClientService.Iface tserverHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java index a12db16f1b7..89531fd5e77 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java @@ -57,4 +57,14 @@ public String toString() { public static ThriftServerType getDefault() { return CUSTOM_HS_HA; } + + public static boolean supportsAsync(ThriftServerType thriftServerType) { + switch (thriftServerType) { + case CUSTOM_HS_HA: + case THREADED_SELECTOR: + return true; + default: + return false; + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 4148cfb0ed2..ffdb05b4abe 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -22,20 +22,24 @@ import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.server.metrics.ThriftMetrics; +import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer; + +import com.google.common.base.Preconditions; /** * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem. */ -public class TimedProcessor implements TProcessor { +public abstract class TimedProcessor implements TProcessor { - private final TProcessor other; - private final ThriftMetrics thriftMetrics; - private long idleStart; + protected final TProcessor other; + protected final ThriftMetrics thriftMetrics; + protected long idleStart; - public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { + protected TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); metricsInfo.addMetricsProducers(thriftMetrics); @@ -44,14 +48,57 @@ public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { @Override public void process(TProtocol in, TProtocol out) throws TException { + time(() -> other.process(in, out)); + } + + public boolean isAsync() { + return false; + } + + protected void time(ThriftRunnable runnable) throws TException { long processStart = System.nanoTime(); thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart)); try { - other.process(in, out); + runnable.run(); } finally { // set idle to now, calc time in process idleStart = System.nanoTime(); thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart)); } } + + public static class SyncTimedProcessor extends TimedProcessor { + protected SyncTimedProcessor(TProcessor next, MetricsInfo metricsInfo) { + super(next, metricsInfo); + } + } + + public static class AsyncTimedProcessor extends TimedProcessor implements TAsyncProcessor { + private final TAsyncProcessor other; + + public AsyncTimedProcessor(TAsyncProcessor next, MetricsInfo metricsInfo) { + super(validate(next), metricsInfo); + this.other = next; + } + + @Override + public boolean isAsync() { + return true; + } + + @Override + public void process(AsyncFrameBuffer fb) throws TException { + this.other.process(fb); + } + + private static TProcessor validate(TAsyncProcessor other) { + Preconditions.checkArgument(other instanceof TProcessor, + "Async processor must also implement TProcessor"); + return (TProcessor) other; + } + } + + protected interface ThriftRunnable { + void run() throws TException; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index d8874b0c6d5..6f78ce46d77 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -26,6 +26,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.server.rpc.ThriftServerType.supportsAsync; import java.io.IOException; import java.net.UnknownHostException; @@ -65,6 +66,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; @@ -216,6 +218,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; + private Optional asyncClientService = null; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); @@ -331,6 +334,16 @@ synchronized void setManagerState(final ManagerState newState) { Manager.this.nextEvent.event("stopped event loop"); }, 100L, 1000L, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); + + asyncClientService.ifPresent(acs -> { + final var asyncClientFuture = + getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + // This frees the main thread and will cause the manager to exit + acs.stop(); + Manager.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(asyncClientFuture); + }); break; case HAVE_LOCK: if (isUpgrading()) { @@ -1115,10 +1128,13 @@ public void run() { ManagerClientService.Iface haProxy = HighlyAvailableServiceWrapper.service(managerClientHandler, this); - ServerAddress sa; - var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, - compactionCoordinator.getThriftService(), haProxy, getContext()); + final boolean isAsync = supportsAsync(context.getThriftServerType()); + Optional syncCompactionCoordinator = + isAsync ? Optional.empty() : Optional.of(compactionCoordinator.getThriftService()); + final ServerAddress sa; + var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, + syncCompactionCoordinator, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, @@ -1129,6 +1145,25 @@ public void run() { clientService = sa.server; log.info("Started Manager client service at {}", sa.address); + final Optional asyncSa; + if (isAsync) { + var asyncProcessor = ThriftProcessorTypes + .getManagerTAsyncProcessor(compactionCoordinator.getAsyncThriftService(), getContext()); + try { + asyncSa = Optional.of(TServerUtils.startServer(context, getHostname(), + Property.MANAGER_ASYNC_CLIENTPORT, asyncProcessor, "AsyncManager", + "Async Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, + Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK)); + } catch (UnknownHostException e) { + throw new IllegalStateException("Unable to start server on host " + getHostname(), e); + } + asyncClientService = asyncSa.map(serverAddress -> serverAddress.server); + log.info("Started Manager async client service at {}", asyncSa.orElseThrow().address); + } else { + asyncSa = Optional.empty(); + asyncClientService = Optional.empty(); + } + // block until we can obtain the ZK lock for the manager ServiceLockData sld; try { @@ -1139,6 +1174,8 @@ public void run() { MetricsInfo metricsInfo = getContext().getMetricsInfo(); metricsInfo.addServiceTags(getApplicationName(), sa.getAddress(), getResourceGroup()); + // TODO: How do we want to handle the second async port for metrics? Should we just + // add a second port tag or should we rename it something else? ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); var producers = managerMetrics.getProducers(getConfiguration(), this); producers.add(balancerMetrics); @@ -1295,11 +1332,19 @@ boolean canSuspendTablets() { String address = sa.address.toString(); UUID uuid = sld.getServerUUID(ThriftService.MANAGER); ServiceDescriptors descriptors = new ServiceDescriptors(); - for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, - ThriftService.FATE}) { + + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.FATE}) { descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); } + asyncSa.ifPresentOrElse(asyncService -> { + // Advertise the COORDINATOR service on the async port so clients correct to the + // correct one + descriptors.addService(new ServiceDescriptor(uuid, ThriftService.COORDINATOR, + asyncSa.orElseThrow().address.toString(), this.getResourceGroup())); + }, () -> descriptors.addService( + new ServiceDescriptor(uuid, ThriftService.COORDINATOR, address, this.getResourceGroup()))); + sld = new ServiceLockData(descriptors); log.info("Setting manager lock data to {}", sld); try { @@ -1308,14 +1353,14 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing()) { + while (!isServing()) { sleepUninterruptibly(100, MILLISECONDS); } // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { + while (isServing()) { sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); @@ -1861,4 +1906,12 @@ private Map> getFateRefs() { Preconditions.checkState(fateRefs != null, "Unexpected null fate references map"); return fateRefs; } + + private boolean isServing() { + if (asyncClientService.isPresent()) { + return clientService.isServing() && asyncClientService.orElseThrow().isServing(); + } else { + return clientService.isServing(); + } + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 1941a762df0..4559663d480 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; @@ -29,6 +30,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metrics.Metric.MAJC_QUEUED; import static org.apache.accumulo.core.metrics.Metric.MAJC_RUNNING; +import static org.apache.accumulo.core.rpc.ThriftUtil.asyncMethodFuture; +import static org.apache.accumulo.core.rpc.ThriftUtil.getFutureThriftResult; import java.io.FileNotFoundException; import java.io.IOException; @@ -45,11 +48,13 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -67,6 +72,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncIface; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -136,6 +142,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,6 +201,8 @@ public class CompactionCoordinator private final int jobQueueInitialSize; private volatile long coordinatorStartTime; + private final long maxJobRequestWaitTime; + private final CompactionCoordinatorService.AsyncIface asyncThriftService; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { @@ -205,6 +214,9 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.jobQueueInitialSize = ctx.getConfiguration() .getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE); + this.maxJobRequestWaitTime = ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME); + this.jobQueues = new CompactionJobQueues(jobQueueInitialSize); this.queueMetrics = new QueueMetrics(jobQueues); @@ -237,6 +249,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false) .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors); // At this point the manager does not have its lock so no actions should be taken yet + + this.asyncThriftService = newAsyncThriftService(); } protected int countCompactors(String groupName) { @@ -362,68 +376,11 @@ public long getNumRunningCompactions() { */ @Override public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, - String groupName, String compactorAddress, String externalCompactionId) - throws ThriftSecurityException { - - // do not expect users to call this directly, expect compactors to call this method - if (!security.canPerformSystemActions(credentials)) { - throw new AccumuloSecurityException(credentials.getPrincipal(), - SecurityErrorCode.PERMISSION_DENIED).asThriftException(); - } - CompactorGroupId groupId = CompactorGroupId.of(groupName); - LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); - TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); - - TExternalCompactionJob result = null; - - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); - - while (metaJob != null) { - - Optional compactionConfig = getCompactionConfig(metaJob); - - // this method may reread the metadata, do not use the metadata in metaJob for anything after - // this method - CompactionMetadata ecm = null; - - var kind = metaJob.getJob().getKind(); - - // Only reserve user compactions when the config is present. When compactions are canceled the - // config is deleted. - var cid = ExternalCompactionId.from(externalCompactionId); - if (kind == CompactionKind.SYSTEM - || (kind == CompactionKind.USER && compactionConfig.isPresent())) { - ecm = reserveCompaction(metaJob, compactorAddress, cid); - } - - if (ecm != null) { - result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); - // It is possible that by the time this added that the the compactor that made this request - // is dead. In this cases the compaction is not actually running. - RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), - new RunningCompaction(result, compactorAddress, groupName)); - TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, - metaJob.getJob()); - break; - } else { - LOG.debug( - "Unable to reserve compaction job for {}, pulling another off the queue for group {}", - metaJob.getTabletMetadata().getExtent(), groupName); - metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); - } - } - - if (metaJob == null) { - LOG.debug("No jobs found in group {} ", groupName); - } - - if (result == null) { - LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, - compactorAddress); - result = new TExternalCompactionJob(); - } - - return new TNextCompactionJob(result, compactorCounts.get(groupName)); + String groupName, String compactorAddress, String externalCompactionId) throws TException { + final CompletableFuture future = new CompletableFuture<>(); + asyncThriftService.getCompactionJob(tinfo, credentials, groupName, compactorAddress, + externalCompactionId, asyncMethodFuture(future)); + return getFutureThriftResult(future); } @VisibleForTesting @@ -668,6 +625,139 @@ public CompactionCoordinatorService.Iface getThriftService() { return this; } + public CompactionCoordinatorService.AsyncIface getAsyncThriftService() { + return asyncThriftService; + } + + private CompactionCoordinatorService.AsyncIface newAsyncThriftService() { + return new AsyncIface() { + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.compactionCompleted(tinfo, credentials, externalCompactionId, + textent, stats); + resultHandler.onComplete(null); + } + + @Override + public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + // Get the next job as a future as we need to wait until something is available + CompactorGroupId groupId = CompactorGroupId.of(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, + compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); + + var future = jobQueues.getAsync(groupId).thenApply(metaJob -> { + LOG.trace("Next metaJob is ready {}", metaJob.getJob()); + Optional compactionConfig = getCompactionConfig(metaJob); + + // this method may reread the metadata, do not use the metadata in metaJob for anything + // after + // this method + CompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled + // the + // config is deleted. + var cid = ExternalCompactionId.from(externalCompactionId); + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, cid); + } + + final TExternalCompactionJob result; + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this + // request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, groupName)); + TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, + metaJob.getJob()); + } else { + LOG.debug( + "Unable to reserve compaction job for {} {}, returning empty job to compactor {}", + groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); + result = new TExternalCompactionJob(); + } + + return new TNextCompactionJob(result, compactorCounts.get(groupName)); + }); + + // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() + // Async send back to the compactor when a new job is ready + // Need the unused var for errorprone + var unused = future.thenAccept(ecj -> { + LOG.debug("Received next compaction job {}", ecj); + resultHandler.onComplete(ecj); + }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { + if (e instanceof TimeoutException) { + LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); + resultHandler.onComplete(new TNextCompactionJob(new TExternalCompactionJob(), + compactorCounts.get(groupName))); + } else { + LOG.warn("Received exception processing compaction job {}", e.getMessage()); + LOG.debug(e.getMessage(), e); + resultHandler.onError(new RuntimeException(e)); + } + return null; + }); + } + + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.updateCompactionStatus(tinfo, credentials, externalCompactionId, + update, timestamp); + resultHandler.onComplete(null); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent extent, AsyncMethodCallback resultHandler) + throws TException { + CompactionCoordinator.this.compactionFailed(tinfo, credentials, externalCompactionId, + extent); + resultHandler.onComplete(null); + } + + @Override + public void getRunningCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback resultHandler) throws TException { + resultHandler + .onComplete(CompactionCoordinator.this.getRunningCompactions(tinfo, credentials)); + } + + @Override + public void getCompletedCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback resultHandler) throws TException { + resultHandler + .onComplete(CompactionCoordinator.this.getCompletedCompactions(tinfo, credentials)); + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.cancel(tinfo, credentials, externalCompactionId); + resultHandler.onComplete(null); + } + }; + } + private Optional getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { if (metaJob.getJob().getKind() == CompactionKind.USER && metaJob.getTabletMetadata().getSelectedFiles() != null) { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 9967610691c..e4b0a3454de 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -53,6 +54,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -96,8 +98,10 @@ import org.apache.accumulo.server.security.SecurityOperation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.thrift.async.AsyncMethodCallback; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import com.google.common.net.HostAndPort; @@ -365,8 +369,20 @@ public void testGetCompactionJob() throws Exception { // Get the next job ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, - GROUP_ID.toString(), "localhost:10241", eci.toString()); + CompletableFuture future = new CompletableFuture<>(); + coordinator.getAsyncThriftService().getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), + "localhost:10241", eci.toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }); + TNextCompactionJob nextJob = future.get(); assertEquals(3, nextJob.getCompactorCount()); TExternalCompactionJob createdJob = nextJob.getJob(); assertEquals(eci.toString(), createdJob.getExternalCompactionId()); @@ -383,12 +399,22 @@ public void testGetCompactionJob() throws Exception { EasyMock.verify(tconf, context, creds, tm, security); } + // Default to a 15-second timeout to make sure that setting the + // COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME is working. If we time + // out then the property is not being applied correctly or the timeout + // on the future is broken. @Test + @Timeout(15) public void testGetCompactionJobNoJobs() throws Exception { ServerContext context = EasyMock.createNiceMock(ServerContext.class); expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance()); + + // Configure a 3-second timeout so we don't have to wait the full default time + // and to speed the test up + config.set(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME, "3s"); + expect(context.getConfiguration()).andReturn(config).anyTimes(); TCredentials creds = EasyMock.createNiceMock(TCredentials.class); @@ -402,8 +428,21 @@ public void testGetCompactionJobNoJobs() throws Exception { EasyMock.replay(context, creds, security, manager); var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); - TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, - GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); + CompletableFuture future = new CompletableFuture<>(); + coordinator.getAsyncThriftService().getCompactionJob(TraceUtil.traceInfo(), creds, + GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString(), + new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }); + TNextCompactionJob nextJob = future.get(); assertEquals(3, nextJob.getCompactorCount()); assertNull(nextJob.getJob().getExternalCompactionId());