From 173502da8bc1e8dd8f5c596accff1e7beb5c7246 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sat, 27 Jul 2024 14:59:16 -0400 Subject: [PATCH 1/5] Convert CompactionCoordinator RPC service to Async Thrift This changes the CompactionCoordinator RPC service to use an Async thrift processor. This allows long polling when a compactor requests a new job by no longer blocking the Thrift IO thread when there are no jobs available. When a job is available, the response will be sent asynchronously back to the client. The only RPC service converted for now is the CompactionCoordinator RPC service but future services could be converted to use Async processors as well. This RPC is not using multiplexing as Async multiplexing is an open issue being worked on in THRIFT-2427. Once that issue is resolve the plan will be to convert to an async multiplexed processor. A second thrift service on another port was created in the Manager for handling async processors because they are handled differently than sync and you can't combine both sync and async processors when multiplexing. The coordinator rpc service now advertises the async port so clients connect to the correct service. This closes #4664 --- .../apache/accumulo/core/conf/Property.java | 8 +- .../apache/accumulo/core/rpc/ThriftUtil.java | 10 ++ .../server/rpc/CustomNonBlockingServer.java | 114 +++++++++++++----- .../server/rpc/ThriftProcessorTypes.java | 20 +-- .../accumulo/server/rpc/TimedProcessor.java | 28 ++++- .../org/apache/accumulo/manager/Manager.java | 38 ++++-- .../coordinator/CompactionCoordinator.java | 107 +++++++++------- .../compaction/CompactionCoordinatorTest.java | 54 +++++++-- 8 files changed, 281 insertions(+), 98 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 47f2f776289..cc4e557e421 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -363,6 +363,8 @@ public enum Property { "Properties in this category affect the behavior of the manager server.", "2.1.0"), MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), + MANAGER_ASYNC_CLIENTPORT("manager.port.async.client", "8999", PropertyType.PORT, + "The port used for handling client connections on the manager for async services.", "4.0.0"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -1165,7 +1167,11 @@ public enum Property { @Experimental COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( "compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, - "The interval at which to check the tservers for external compactions.", "2.1.0"); + "The interval at which to check the tservers for external compactions.", "2.1.0"), + COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME( + "compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION, + "The maximum amount of time the coordinator will wait for a requested job from the job queue.", + "4.0.0"); private final String key; private final String defaultValue; diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 44c58923c1e..f69dd6ba1d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.rpc; +import static org.apache.accumulo.core.rpc.clients.ThriftClientTypes.COORDINATOR; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.IOException; @@ -113,6 +114,15 @@ public static T getClient(ThriftClientTypes type, HostAndPort address, ClientContext context) throws TTransportException { TTransport transport = context.getTransportPool().getTransport(type, address, context.getClientTimeoutInMillis(), context, true); + + // TODO - This is temporary until we support Async multiplexing + // THRIFT-2427 is tracking this issue and the plan is to reopen a new PR + // to add support in the next version. Once we support multiplexing we can + // remove this special case. + if (type == COORDINATOR) { + return type.getClientFactory().getClient(protocolFactory.getProtocol(transport)); + } + return createClient(type, transport); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index 76728722d25..ac104b50a06 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -23,6 +23,9 @@ import java.net.Socket; import java.nio.channels.SelectionKey; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor; +import org.apache.thrift.TProcessor; +import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -99,11 +102,16 @@ public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport) protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) throws TTransportException { - if (processorFactory_.isAsyncProcessor()) { - throw new IllegalStateException("This implementation does not support AsyncProcessors"); + + TProcessor processor = processorFactory_.getProcessor(null); + // Processors are generally wrapped in TimedProcessor so we need to ask TimedProcessor + // what type of processor is being wrapped + if (processor instanceof TimedProcessor) { + return newFrameBuffer(trans, selectionKey, selectThread, + ((TimedProcessor) processor).isAsync()); } - return new CustomFrameBuffer(trans, selectionKey, selectThread); + return newFrameBuffer(trans, selectionKey, selectThread, processor instanceof AsyncProcessor); } } @@ -118,7 +126,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException { super(trans, selectionKey, selectThread); // Store the clientAddress in the buffer so it can be referenced for logging during read/write - this.clientAddress = getClientAddress(); + this.clientAddress = getClientAddress(trans_, "CustomFrameBuffer"); } @Override @@ -148,33 +156,83 @@ public boolean write() { } return result; } + } + + /** + * Custom wrapper around + * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} to extract the + * client's network location before accepting the request. + */ + private class CustomAsyncFrameBuffer extends AsyncFrameBuffer { + private final String clientAddress; + + public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, + AbstractSelectThread selectThread) throws TTransportException { + super(trans, selectionKey, selectThread); + // Store the clientAddress in the buffer so it can be referenced for logging during read/write + this.clientAddress = getClientAddress(trans_, "CustomAsyncFrameBuffer"); + } + + @Override + public void invoke() { + // On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere + // in the same thread that called invoke() on the buffer + TServerUtils.clientAddress.set(clientAddress); + super.invoke(); + } - /* - * Helper method used to capture the client address inside the CustomFrameBuffer constructor so - * that it can be referenced inside the read/write methods for logging purposes. It previously - * was only set on the ThreadLocal in the invoke() method but that does not work because A) the - * method isn't called until after reading is finished so the value will be null inside of - * read() and B) The other problem is that invoke() is called on a different thread than - * read()/write() so even if the order was correct it would not be available. - * - * Since a new FrameBuffer is created for each request we can use it to capture the client - * address earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read - * data and write a response back to the client and as part of creation of the buffer the - * TNonblockingSocket is stored as a final variable and won't change so we can safely capture - * the clientAddress in the constructor and use it for logging during read/write and then use - * the value inside of invoke() to set the ThreadLocal so the client address will still be - * available on the thread that called invoke(). - */ - private String getClientAddress() { - String clientAddress = null; - if (trans_ instanceof TNonblockingSocket) { - TNonblockingSocket tsock = (TNonblockingSocket) trans_; - Socket sock = tsock.getSocketChannel().socket(); - clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort(); - log.trace("CustomFrameBuffer captured client address: {}", clientAddress); + @Override + public boolean read() { + boolean result = super.read(); + if (!result) { + log.trace("CustomAsyncFrameBuffer.read returned false when reading data from client: {}", + clientAddress); } - return clientAddress; + return result; + } + + @Override + public boolean write() { + boolean result = super.write(); + if (!result) { + log.trace("CustomAsyncFrameBuffer.write returned false when writing data to client: {}", + clientAddress); + } + return result; } } + /* + * Helper method used to capture the client address inside the CustomFrameBuffer and + * CustomAsyncFrameBuffer constructors so that it can be referenced inside the read/write methods + * for logging purposes. It previously was only set on the ThreadLocal in the invoke() method but + * that does not work because A) the method isn't called until after reading is finished so the + * value will be null inside of read() and B) The other problem is that invoke() is called on a + * different thread than read()/write() so even if the order was correct it would not be + * available. + * + * Since a new FrameBuffer is created for each request we can use it to capture the client address + * earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read data and + * write a response back to the client and as part of creation of the buffer the + * TNonblockingSocket is stored as a final variable and won't change so we can safely capture the + * clientAddress in the constructor and use it for logging during read/write and then use the + * value inside of invoke() to set the ThreadLocal so the client address will still be available + * on the thread that called invoke(). + */ + private static String getClientAddress(TNonblockingTransport transport, String name) { + String clientAddress = null; + if (transport instanceof TNonblockingSocket) { + TNonblockingSocket tsock = (TNonblockingSocket) transport; + Socket sock = tsock.getSocketChannel().socket(); + clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort(); + log.trace("{} captured client address: {}", name, clientAddress); + } + return clientAddress; + } + + private FrameBuffer newFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, + AbstractSelectThread selectThread, boolean async) throws TTransportException { + return async ? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread) + : new CustomFrameBuffer(trans, selectionKey, selectThread); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java index 6f0a6086eba..7954a43da90 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java @@ -20,6 +20,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.manager.thrift.FateService; @@ -32,7 +33,6 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; -import org.apache.thrift.TBaseProcessor; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.TServiceClient; @@ -47,8 +47,8 @@ public ThriftProcessorTypes(ThriftClientTypes type) { } @VisibleForTesting - public > TProcessor getTProcessor( - Class

processorClass, Class interfaceClass, H serviceHandler, ServerContext context) { + public P getTProcessor(Class

processorClass, + Class interfaceClass, H serviceHandler, ServerContext context) { I rpcProxy = TraceUtil.wrapService(serviceHandler); if (context.getThriftServerType() == ThriftServerType.SASL) { @SuppressWarnings("unchecked") @@ -114,20 +114,26 @@ public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface servi } public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler, - CompactionCoordinatorService.Iface coordinatorServiceHandler, ManagerClientService.Iface managerServiceHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor( FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context)); - muxProcessor.registerProcessor(COORDINATOR.getServiceName(), - COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, - CompactionCoordinatorService.Iface.class, coordinatorServiceHandler, context)); muxProcessor.registerProcessor(MANAGER.getServiceName(), MANAGER.getTProcessor(ManagerClientService.Processor.class, ManagerClientService.Iface.class, managerServiceHandler, context)); return muxProcessor; } + public static AsyncProcessor getManagerTAsyncProcessor( + CompactionCoordinatorService.AsyncIface coordinatorServiceHandler, ServerContext context) { + // TODO - Right now this is temporarily returning the single AsyncProcessor + // Once Thrift supports Async multiplexing then we can switch to using + // TMultiplexedAsyncProcessor and use multiplexing like we do for sync processors + // THRIFT-2427 is tracking this issue + return COORDINATOR.getTProcessor(CompactionCoordinatorService.AsyncProcessor.class, + CompactionCoordinatorService.AsyncIface.class, coordinatorServiceHandler, context); + } + public static TMultiplexedProcessor getScanServerTProcessor(ClientServiceHandler clientHandler, TabletScanClientService.Iface tserverHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 4148cfb0ed2..6a189694ae1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -22,36 +22,60 @@ import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.server.metrics.ThriftMetrics; +import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer; + +import com.google.common.base.Preconditions; /** * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem. */ -public class TimedProcessor implements TProcessor { +public class TimedProcessor implements TProcessor, TAsyncProcessor { private final TProcessor other; private final ThriftMetrics thriftMetrics; private long idleStart; + private final boolean async; public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); metricsInfo.addMetricsProducers(thriftMetrics); idleStart = System.nanoTime(); + this.async = next instanceof TAsyncProcessor; } @Override public void process(TProtocol in, TProtocol out) throws TException { + time(() -> other.process(in, out)); + } + + @Override + public void process(AsyncFrameBuffer fb) throws TException { + Preconditions.checkState(isAsync(), "Processor is not Async"); + time(() -> ((TAsyncProcessor) this.other).process(fb)); + } + + public boolean isAsync() { + return async; + } + + protected void time(ThriftRunnable runnable) throws TException { long processStart = System.nanoTime(); thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart)); try { - other.process(in, out); + runnable.run(); } finally { // set idle to now, calc time in process idleStart = System.nanoTime(); thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart)); } } + + protected interface ThriftRunnable { + void run() throws TException; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index d8874b0c6d5..f055117b3ba 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -216,6 +216,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; + private TServer asyncClientService = null; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); @@ -331,6 +332,14 @@ synchronized void setManagerState(final ManagerState newState) { Manager.this.nextEvent.event("stopped event loop"); }, 100L, 1000L, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); + + final var asyncClientFuture = + getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + // This frees the main thread and will cause the manager to exit + asyncClientService.stop(); + Manager.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(asyncClientFuture); break; case HAVE_LOCK: if (isUpgrading()) { @@ -1115,19 +1124,28 @@ public void run() { ManagerClientService.Iface haProxy = HighlyAvailableServiceWrapper.service(managerClientHandler, this); - ServerAddress sa; - var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, - compactionCoordinator.getThriftService(), haProxy, getContext()); + final ServerAddress sa; + final ServerAddress asyncSa; + var processor = + ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, getContext()); + var asyncProcessor = ThriftProcessorTypes + .getManagerTAsyncProcessor(compactionCoordinator.getThriftService(), getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); + asyncSa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_ASYNC_CLIENTPORT, + asyncProcessor, "AsyncManager", "Async Manager Client Service Handler", null, + Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, + Property.MANAGER_THREADCHECK); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } clientService = sa.server; + asyncClientService = asyncSa.server; log.info("Started Manager client service at {}", sa.address); + log.info("Started Manager async client service at {}", asyncSa.address); // block until we can obtain the ZK lock for the manager ServiceLockData sld; @@ -1139,6 +1157,8 @@ public void run() { MetricsInfo metricsInfo = getContext().getMetricsInfo(); metricsInfo.addServiceTags(getApplicationName(), sa.getAddress(), getResourceGroup()); + // TODO: How do we want to handle the second async port for metrics? Should we just + // add a second port tag or should we rename it something else? ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); var producers = managerMetrics.getProducers(getConfiguration(), this); producers.add(balancerMetrics); @@ -1295,11 +1315,15 @@ boolean canSuspendTablets() { String address = sa.address.toString(); UUID uuid = sld.getServerUUID(ThriftService.MANAGER); ServiceDescriptors descriptors = new ServiceDescriptors(); - for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, - ThriftService.FATE}) { + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.FATE}) { descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); } + // Advertise the COORDINATOR service on the async port so clients correct to the + // correct one + descriptors.addService(new ServiceDescriptor(uuid, ThriftService.COORDINATOR, + asyncSa.address.toString(), this.getResourceGroup())); + sld = new ServiceLockData(descriptors); log.info("Setting manager lock data to {}", sld); try { @@ -1308,14 +1332,14 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing()) { + while (!clientService.isServing() && !asyncClientService.isServing()) { sleepUninterruptibly(100, MILLISECONDS); } // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { + while (clientService.isServing() && asyncClientService.isServing()) { sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 1941a762df0..f4f8dad35af 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; @@ -50,6 +51,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -136,6 +138,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +156,7 @@ import io.micrometer.core.instrument.MeterRegistry; public class CompactionCoordinator - implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { + implements CompactionCoordinatorService.AsyncIface, Runnable, MetricsProducer { private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); @@ -194,6 +197,7 @@ public class CompactionCoordinator private final int jobQueueInitialSize; private volatile long coordinatorStartTime; + private final long maxJobRequestWaitTime; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { @@ -205,6 +209,9 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.jobQueueInitialSize = ctx.getConfiguration() .getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE); + this.maxJobRequestWaitTime = ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME); + this.jobQueues = new CompactionJobQueues(jobQueueInitialSize); this.queueMetrics = new QueueMetrics(jobQueues); @@ -358,28 +365,25 @@ public long getNumRunningCompactions() { * @param groupName group * @param compactorAddress compactor address * @throws ThriftSecurityException when permission error - * @return compaction job */ @Override - public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, - String groupName, String compactorAddress, String externalCompactionId) - throws ThriftSecurityException { + public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId, + AsyncMethodCallback resultHandler) throws ThriftSecurityException { // do not expect users to call this directly, expect compactors to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } + + // Get the next job as a future as we need to wait until something is available CompactorGroupId groupId = CompactorGroupId.of(groupName); LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); - TExternalCompactionJob result = null; - - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); - - while (metaJob != null) { - + var future = jobQueues.getAsync(groupId).thenApply(metaJob -> { + LOG.trace("Next metaJob is ready {}", metaJob.getJob()); Optional compactionConfig = getCompactionConfig(metaJob); // this method may reread the metadata, do not use the metadata in metaJob for anything after @@ -396,6 +400,7 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials ecm = reserveCompaction(metaJob, compactorAddress, cid); } + final TExternalCompactionJob result; if (ecm != null) { result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); // It is possible that by the time this added that the the compactor that made this request @@ -404,26 +409,33 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials new RunningCompaction(result, compactorAddress, groupName)); TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, metaJob.getJob()); - break; } else { - LOG.debug( - "Unable to reserve compaction job for {}, pulling another off the queue for group {}", - metaJob.getTabletMetadata().getExtent(), groupName); - metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + LOG.debug("Unable to reserve compaction job for {} {}, returning empty job to compactor {}", + groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); + result = new TExternalCompactionJob(); } - } - if (metaJob == null) { - LOG.debug("No jobs found in group {} ", groupName); - } - - if (result == null) { - LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, - compactorAddress); - result = new TExternalCompactionJob(); - } + return new TNextCompactionJob(result, compactorCounts.get(groupName)); + }); - return new TNextCompactionJob(result, compactorCounts.get(groupName)); + // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() + // Async send back to the compactor when a new job is ready + // Need the unused var for errorprone + var unused = future.thenAccept(ecj -> { + LOG.debug("Received next compaction job {}", ecj); + resultHandler.onComplete(ecj); + }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { + if (e instanceof TimeoutException) { + LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); + resultHandler.onComplete( + new TNextCompactionJob(new TExternalCompactionJob(), compactorCounts.get(groupName))); + } else { + LOG.warn("Received exception processing compaction job {}", e.getMessage()); + LOG.debug(e.getMessage(), e); + resultHandler.onError(new RuntimeException(e)); + } + return null; + }); } @VisibleForTesting @@ -664,7 +676,7 @@ public void addJobs(TabletMetadata tabletMetadata, Collection job jobQueues.add(tabletMetadata, jobs); } - public CompactionCoordinatorService.Iface getThriftService() { + public CompactionCoordinatorService.AsyncIface getThriftService() { return this; } @@ -713,8 +725,8 @@ private Optional getCompactionConfig(CompactionJobQueues.MetaJ */ @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TKeyExtent textent, TCompactionStats stats) - throws ThriftSecurityException { + String externalCompactionId, TKeyExtent textent, TCompactionStats stats, + AsyncMethodCallback resultHandler) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -767,11 +779,12 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} {}", ecid, fateId), () -> LOG.debug("compaction commit already initiated for {}", ecid)); + resultHandler.onComplete(null); } @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException { + TKeyExtent extent, AsyncMethodCallback resultHandler) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -781,6 +794,7 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); final var ecid = ExternalCompactionId.of(externalCompactionId); compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + resultHandler.onComplete(null); } void compactionsFailed(Map compactions) { @@ -896,8 +910,8 @@ public boolean test(TabletMetadata tabletMetadata) { */ @Override public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TCompactionStatusUpdate update, long timestamp) - throws ThriftSecurityException { + String externalCompactionId, TCompactionStatusUpdate update, long timestamp, + AsyncMethodCallback resultHandler) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -909,6 +923,7 @@ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, if (null != rc) { rc.addUpdate(timestamp, update); } + resultHandler.onComplete(null); } public void recordCompletion(ExternalCompactionId ecid) { @@ -928,16 +943,16 @@ protected Set readExternalCompactionIds() { } /** - * Return information about running compactions + * Return information about running compactions Sends back map of ECID to TExternalCompaction + * objects * * @param tinfo trace info * @param credentials tcredentials object - * @return map of ECID to TExternalCompaction objects * @throws ThriftSecurityException permission error */ @Override - public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) - throws ThriftSecurityException { + public void getRunningCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback callback) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -953,20 +968,20 @@ public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials c trc.setJob(rc.getJob()); result.putToCompactions(ecid.canonical(), trc); }); - return result; + callback.onComplete(result); } /** - * Return information about recently completed compactions + * Return information about recently completed compactions send back map of ECID to + * TExternalCompaction objects * * @param tinfo trace info * @param credentials tcredentials object - * @return map of ECID to TExternalCompaction objects * @throws ThriftSecurityException permission error */ @Override - public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) - throws ThriftSecurityException { + public void getCompletedCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback callback) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -981,12 +996,13 @@ public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials trc.setUpdates(rc.getUpdates()); result.putToCompactions(ecid.canonical(), trc); }); - return result; + callback.onComplete(result); } @Override - public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) - throws TException { + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId, + AsyncMethodCallback callback) throws TException { + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); try { @@ -1001,6 +1017,7 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact } cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); + callback.onComplete(null); } /* Method exists to be called from test */ diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 9967610691c..299a69a2bf9 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -53,6 +54,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -96,8 +98,10 @@ import org.apache.accumulo.server.security.SecurityOperation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.thrift.async.AsyncMethodCallback; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import com.google.common.net.HostAndPort; @@ -160,12 +164,12 @@ protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecut @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TKeyExtent textent, TCompactionStats stats) - throws ThriftSecurityException {} + String externalCompactionId, TKeyExtent textent, TCompactionStats stats, + AsyncMethodCallback callback) throws ThriftSecurityException {} @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException {} + TKeyExtent extent, AsyncMethodCallback callback) throws ThriftSecurityException {} void setMetadataCompactionIds(Set mci) { metadataCompactionIds = mci; @@ -365,8 +369,20 @@ public void testGetCompactionJob() throws Exception { // Get the next job ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, - GROUP_ID.toString(), "localhost:10241", eci.toString()); + CompletableFuture future = new CompletableFuture<>(); + coordinator.getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), "localhost:10241", + eci.toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }); + TNextCompactionJob nextJob = future.get(); assertEquals(3, nextJob.getCompactorCount()); TExternalCompactionJob createdJob = nextJob.getJob(); assertEquals(eci.toString(), createdJob.getExternalCompactionId()); @@ -383,12 +399,22 @@ public void testGetCompactionJob() throws Exception { EasyMock.verify(tconf, context, creds, tm, security); } + // Default to a 15-second timeout to make sure that setting the + // COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME is working. If we time + // out then the property is not being applied correctly or the timeout + // on the future is broken. @Test + @Timeout(15) public void testGetCompactionJobNoJobs() throws Exception { ServerContext context = EasyMock.createNiceMock(ServerContext.class); expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance()); + + // Configure a 3-second timeout so we don't have to wait the full default time + // and to speed the test up + config.set(Property.COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME, "3s"); + expect(context.getConfiguration()).andReturn(config).anyTimes(); TCredentials creds = EasyMock.createNiceMock(TCredentials.class); @@ -402,8 +428,20 @@ public void testGetCompactionJobNoJobs() throws Exception { EasyMock.replay(context, creds, security, manager); var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); - TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, - GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); + CompletableFuture future = new CompletableFuture<>(); + coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, GROUP_ID.toString(), + "localhost:10240", UUID.randomUUID().toString(), new AsyncMethodCallback<>() { + @Override + public void onComplete(TNextCompactionJob response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }); + TNextCompactionJob nextJob = future.get(); assertEquals(3, nextJob.getCompactorCount()); assertNull(nextJob.getJob().getExternalCompactionId()); From 8e0f16b9e84c8fe86d781bfc5660ae34e2e1f3ce Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sat, 28 Sep 2024 13:19:06 -0400 Subject: [PATCH 2/5] Make sure TimedProcessor is correctly detected as sync vs async --- .../accumulo/server/rpc/TServerUtils.java | 13 ++++- .../accumulo/server/rpc/TimedProcessor.java | 51 ++++++++++++++----- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 9b64aab794a..39668a71f3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -53,8 +53,11 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.rpc.TimedProcessor.AsyncTimedProcessor; +import org.apache.accumulo.server.rpc.TimedProcessor.SyncTimedProcessor; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; @@ -170,7 +173,7 @@ public static ServerAddress startServer(ServerContext context, String hostname, // create the TimedProcessor outside the port search loop so we don't try to // register the same // metrics mbean more than once - TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo()); + TimedProcessor timedProcessor = newTimedProcessor(processor, context.getMetricsInfo()); HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { @@ -574,7 +577,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf, } try { - return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, + return startTServer(serverType, newTimedProcessor(processor, metricsInfo), serverName, threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses); } catch (TTransportException e) { @@ -702,4 +705,10 @@ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProc return new UGIAssumingProcessor(processor); } + + private static TimedProcessor newTimedProcessor(TProcessor processor, MetricsInfo metricsInfo) { + return processor instanceof TAsyncProcessor + ? new AsyncTimedProcessor((TAsyncProcessor) processor, metricsInfo) + : new SyncTimedProcessor(processor, metricsInfo); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 6a189694ae1..ffdb05b4abe 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -33,19 +33,17 @@ /** * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem. */ -public class TimedProcessor implements TProcessor, TAsyncProcessor { +public abstract class TimedProcessor implements TProcessor { - private final TProcessor other; - private final ThriftMetrics thriftMetrics; - private long idleStart; - private final boolean async; + protected final TProcessor other; + protected final ThriftMetrics thriftMetrics; + protected long idleStart; - public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { + protected TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); metricsInfo.addMetricsProducers(thriftMetrics); idleStart = System.nanoTime(); - this.async = next instanceof TAsyncProcessor; } @Override @@ -53,14 +51,8 @@ public void process(TProtocol in, TProtocol out) throws TException { time(() -> other.process(in, out)); } - @Override - public void process(AsyncFrameBuffer fb) throws TException { - Preconditions.checkState(isAsync(), "Processor is not Async"); - time(() -> ((TAsyncProcessor) this.other).process(fb)); - } - public boolean isAsync() { - return async; + return false; } protected void time(ThriftRunnable runnable) throws TException { @@ -75,6 +67,37 @@ protected void time(ThriftRunnable runnable) throws TException { } } + public static class SyncTimedProcessor extends TimedProcessor { + protected SyncTimedProcessor(TProcessor next, MetricsInfo metricsInfo) { + super(next, metricsInfo); + } + } + + public static class AsyncTimedProcessor extends TimedProcessor implements TAsyncProcessor { + private final TAsyncProcessor other; + + public AsyncTimedProcessor(TAsyncProcessor next, MetricsInfo metricsInfo) { + super(validate(next), metricsInfo); + this.other = next; + } + + @Override + public boolean isAsync() { + return true; + } + + @Override + public void process(AsyncFrameBuffer fb) throws TException { + this.other.process(fb); + } + + private static TProcessor validate(TAsyncProcessor other) { + Preconditions.checkArgument(other instanceof TProcessor, + "Async processor must also implement TProcessor"); + return (TProcessor) other; + } + } + protected interface ThriftRunnable { void run() throws TException; } From 5223c22afaab2772e6a89db9add79edfadac863c Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sun, 29 Sep 2024 12:52:19 -0400 Subject: [PATCH 3/5] Add missing completion calls on thrift async resultHandler --- .../manager/compaction/coordinator/CompactionCoordinator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index f4f8dad35af..ae6c2eb8164 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -738,6 +738,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, while (localFates == null) { UtilWaitThread.sleep(100); if (shutdown.getCount() == 0) { + resultHandler.onComplete(null); return; } localFates = fateInstances.get(); @@ -764,10 +765,12 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, tableState); // cleanup metadata table and files related to the compaction compactionsFailed(Map.of(ecid, extent)); + resultHandler.onComplete(null); return; } if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { + resultHandler.onComplete(null); return; } From 9fa54d02142d4d2062d895b1014c64fe7249ab41 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sun, 29 Sep 2024 14:26:35 -0400 Subject: [PATCH 4/5] Simplify custom thrift frame creation --- .../server/rpc/CustomNonBlockingServer.java | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java index ac104b50a06..32a059dc757 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java @@ -23,7 +23,7 @@ import java.net.Socket; import java.nio.channels.SelectionKey; -import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor; +import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer; import org.apache.thrift.server.THsHaServer; @@ -104,14 +104,9 @@ protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, throws TTransportException { TProcessor processor = processorFactory_.getProcessor(null); - // Processors are generally wrapped in TimedProcessor so we need to ask TimedProcessor - // what type of processor is being wrapped - if (processor instanceof TimedProcessor) { - return newFrameBuffer(trans, selectionKey, selectThread, - ((TimedProcessor) processor).isAsync()); - } - - return newFrameBuffer(trans, selectionKey, selectThread, processor instanceof AsyncProcessor); + return processor instanceof TAsyncProcessor + ? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread) + : new CustomFrameBuffer(trans, selectionKey, selectThread); } } @@ -230,9 +225,4 @@ private static String getClientAddress(TNonblockingTransport transport, String n return clientAddress; } - private FrameBuffer newFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, - AbstractSelectThread selectThread, boolean async) throws TTransportException { - return async ? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread) - : new CustomFrameBuffer(trans, selectionKey, selectThread); - } } From 4e26740e23a08ccfc8cbfc2082834da340fffb26 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sun, 29 Sep 2024 16:44:01 -0400 Subject: [PATCH 5/5] Only start Async CompactionCoordinator service when nonblocking server --- .../apache/accumulo/core/rpc/ThriftUtil.java | 41 ++- .../server/rpc/ThriftProcessorTypes.java | 11 +- .../accumulo/server/rpc/ThriftServerType.java | 10 + .../org/apache/accumulo/manager/Manager.java | 81 ++++-- .../coordinator/CompactionCoordinator.java | 262 +++++++++++------- .../compaction/CompactionCoordinatorTest.java | 15 +- 6 files changed, 289 insertions(+), 131 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index f69dd6ba1d6..ff7edfd080e 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -27,14 +27,18 @@ import java.nio.channels.ClosedByInterruptException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSaslClientTransport; @@ -45,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; import com.google.common.net.HostAndPort; /** @@ -119,7 +124,13 @@ public static T getClient(ThriftClientTypes type, // THRIFT-2427 is tracking this issue and the plan is to reopen a new PR // to add support in the next version. Once we support multiplexing we can // remove this special case. - if (type == COORDINATOR) { + // Note: you could have a sync server that doesn't use SSL or SASL by using + // ThriftServerType.THREADPOOL on the server but the client wouldn't know that + // For now this should be good enough as we will be able to remove this anyways + // when we can multiplex async. So this currently will not work if using that mode. + boolean isSync = context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED) + || context.getConfiguration().getBoolean(Property.INSTANCE_RPC_SSL_ENABLED); + if (type == COORDINATOR && !isSync) { return type.getClientFactory().getClient(protocolFactory.getProtocol(transport)); } @@ -392,4 +403,32 @@ public static void checkIOExceptionCause(IOException e) { throw new UncheckedIOException(e); } } + + public static AsyncMethodCallback asyncMethodFuture(CompletableFuture future) { + return new AsyncMethodCallback() { + @Override + public void onComplete(T response) { + future.complete(response); + } + + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } + }; + } + + public static T getFutureThriftResult(CompletableFuture future) throws TException { + try { + return future.get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + Throwable rootCause = Throwables.getRootCause(e); + if (rootCause instanceof TException) { + throw (TException) rootCause; + } + } + throw new RuntimeException(e); + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java index 7954a43da90..9c4df0bc794 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java @@ -18,9 +18,12 @@ */ package org.apache.accumulo.server.rpc; +import java.util.Optional; + import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Iface; import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.gc.thrift.GCMonitorService; import org.apache.accumulo.core.manager.thrift.FateService; @@ -114,10 +117,16 @@ public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface servi } public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler, - ManagerClientService.Iface managerServiceHandler, ServerContext context) { + ManagerClientService.Iface managerServiceHandler, + Optional coordinatorServiceHandler, + ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor( FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context)); + coordinatorServiceHandler + .ifPresent(csh -> muxProcessor.registerProcessor(COORDINATOR.getServiceName(), + COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class, Iface.class, + csh, context))); muxProcessor.registerProcessor(MANAGER.getServiceName(), MANAGER.getTProcessor(ManagerClientService.Processor.class, ManagerClientService.Iface.class, managerServiceHandler, context)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java index a12db16f1b7..89531fd5e77 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java @@ -57,4 +57,14 @@ public String toString() { public static ThriftServerType getDefault() { return CUSTOM_HS_HA; } + + public static boolean supportsAsync(ThriftServerType thriftServerType) { + switch (thriftServerType) { + case CUSTOM_HS_HA: + case THREADED_SELECTOR: + return true; + default: + return false; + } + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index f055117b3ba..6f78ce46d77 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -26,6 +26,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.server.rpc.ThriftServerType.supportsAsync; import java.io.IOException; import java.net.UnknownHostException; @@ -65,6 +66,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; @@ -216,7 +218,7 @@ public class Manager extends AbstractServer ServiceLock managerLock = null; private TServer clientService = null; - private TServer asyncClientService = null; + private Optional asyncClientService = null; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; private final BalancerMetrics balancerMetrics = new BalancerMetrics(); @@ -333,13 +335,15 @@ synchronized void setManagerState(final ManagerState newState) { }, 100L, 1000L, MILLISECONDS); ThreadPools.watchNonCriticalScheduledTask(future); - final var asyncClientFuture = - getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { - // This frees the main thread and will cause the manager to exit - asyncClientService.stop(); - Manager.this.nextEvent.event("stopped event loop"); - }, 100L, 1000L, MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(asyncClientFuture); + asyncClientService.ifPresent(acs -> { + final var asyncClientFuture = + getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> { + // This frees the main thread and will cause the manager to exit + acs.stop(); + Manager.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L, MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(asyncClientFuture); + }); break; case HAVE_LOCK: if (isUpgrading()) { @@ -1124,28 +1128,41 @@ public void run() { ManagerClientService.Iface haProxy = HighlyAvailableServiceWrapper.service(managerClientHandler, this); - final ServerAddress sa; - final ServerAddress asyncSa; - var processor = - ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, getContext()); - var asyncProcessor = ThriftProcessorTypes - .getManagerTAsyncProcessor(compactionCoordinator.getThriftService(), getContext()); + final boolean isAsync = supportsAsync(context.getThriftServerType()); + Optional syncCompactionCoordinator = + isAsync ? Optional.empty() : Optional.of(compactionCoordinator.getThriftService()); + final ServerAddress sa; + var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, + syncCompactionCoordinator, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); - asyncSa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_ASYNC_CLIENTPORT, - asyncProcessor, "AsyncManager", "Async Manager Client Service Handler", null, - Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, - Property.MANAGER_THREADCHECK); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } clientService = sa.server; - asyncClientService = asyncSa.server; log.info("Started Manager client service at {}", sa.address); - log.info("Started Manager async client service at {}", asyncSa.address); + + final Optional asyncSa; + if (isAsync) { + var asyncProcessor = ThriftProcessorTypes + .getManagerTAsyncProcessor(compactionCoordinator.getAsyncThriftService(), getContext()); + try { + asyncSa = Optional.of(TServerUtils.startServer(context, getHostname(), + Property.MANAGER_ASYNC_CLIENTPORT, asyncProcessor, "AsyncManager", + "Async Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, + Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK)); + } catch (UnknownHostException e) { + throw new IllegalStateException("Unable to start server on host " + getHostname(), e); + } + asyncClientService = asyncSa.map(serverAddress -> serverAddress.server); + log.info("Started Manager async client service at {}", asyncSa.orElseThrow().address); + } else { + asyncSa = Optional.empty(); + asyncClientService = Optional.empty(); + } // block until we can obtain the ZK lock for the manager ServiceLockData sld; @@ -1315,14 +1332,18 @@ boolean canSuspendTablets() { String address = sa.address.toString(); UUID uuid = sld.getServerUUID(ThriftService.MANAGER); ServiceDescriptors descriptors = new ServiceDescriptors(); + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.FATE}) { descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); } - // Advertise the COORDINATOR service on the async port so clients correct to the - // correct one - descriptors.addService(new ServiceDescriptor(uuid, ThriftService.COORDINATOR, - asyncSa.address.toString(), this.getResourceGroup())); + asyncSa.ifPresentOrElse(asyncService -> { + // Advertise the COORDINATOR service on the async port so clients correct to the + // correct one + descriptors.addService(new ServiceDescriptor(uuid, ThriftService.COORDINATOR, + asyncSa.orElseThrow().address.toString(), this.getResourceGroup())); + }, () -> descriptors.addService( + new ServiceDescriptor(uuid, ThriftService.COORDINATOR, address, this.getResourceGroup()))); sld = new ServiceLockData(descriptors); log.info("Setting manager lock data to {}", sld); @@ -1332,14 +1353,14 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception updating manager lock", e); } - while (!clientService.isServing() && !asyncClientService.isServing()) { + while (!isServing()) { sleepUninterruptibly(100, MILLISECONDS); } // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing() && asyncClientService.isServing()) { + while (isServing()) { sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); @@ -1885,4 +1906,12 @@ private Map> getFateRefs() { Preconditions.checkState(fateRefs != null, "Unexpected null fate references map"); return fateRefs; } + + private boolean isServing() { + if (asyncClientService.isPresent()) { + return clientService.isServing() && asyncClientService.orElseThrow().isServing(); + } else { + return clientService.isServing(); + } + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index ae6c2eb8164..4559663d480 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -30,6 +30,8 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import static org.apache.accumulo.core.metrics.Metric.MAJC_QUEUED; import static org.apache.accumulo.core.metrics.Metric.MAJC_RUNNING; +import static org.apache.accumulo.core.rpc.ThriftUtil.asyncMethodFuture; +import static org.apache.accumulo.core.rpc.ThriftUtil.getFutureThriftResult; import java.io.FileNotFoundException; import java.io.IOException; @@ -46,6 +48,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; @@ -69,6 +72,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncIface; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -156,7 +160,7 @@ import io.micrometer.core.instrument.MeterRegistry; public class CompactionCoordinator - implements CompactionCoordinatorService.AsyncIface, Runnable, MetricsProducer { + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); @@ -198,6 +202,7 @@ public class CompactionCoordinator private volatile long coordinatorStartTime; private final long maxJobRequestWaitTime; + private final CompactionCoordinatorService.AsyncIface asyncThriftService; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { @@ -244,6 +249,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false) .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors); // At this point the manager does not have its lock so no actions should be taken yet + + this.asyncThriftService = newAsyncThriftService(); } protected int countCompactors(String groupName) { @@ -365,77 +372,15 @@ public long getNumRunningCompactions() { * @param groupName group * @param compactorAddress compactor address * @throws ThriftSecurityException when permission error + * @return compaction job */ @Override - public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, - String compactorAddress, String externalCompactionId, - AsyncMethodCallback resultHandler) throws ThriftSecurityException { - - // do not expect users to call this directly, expect compactors to call this method - if (!security.canPerformSystemActions(credentials)) { - throw new AccumuloSecurityException(credentials.getPrincipal(), - SecurityErrorCode.PERMISSION_DENIED).asThriftException(); - } - - // Get the next job as a future as we need to wait until something is available - CompactorGroupId groupId = CompactorGroupId.of(groupName); - LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); - TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); - - var future = jobQueues.getAsync(groupId).thenApply(metaJob -> { - LOG.trace("Next metaJob is ready {}", metaJob.getJob()); - Optional compactionConfig = getCompactionConfig(metaJob); - - // this method may reread the metadata, do not use the metadata in metaJob for anything after - // this method - CompactionMetadata ecm = null; - - var kind = metaJob.getJob().getKind(); - - // Only reserve user compactions when the config is present. When compactions are canceled the - // config is deleted. - var cid = ExternalCompactionId.from(externalCompactionId); - if (kind == CompactionKind.SYSTEM - || (kind == CompactionKind.USER && compactionConfig.isPresent())) { - ecm = reserveCompaction(metaJob, compactorAddress, cid); - } - - final TExternalCompactionJob result; - if (ecm != null) { - result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); - // It is possible that by the time this added that the the compactor that made this request - // is dead. In this cases the compaction is not actually running. - RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), - new RunningCompaction(result, compactorAddress, groupName)); - TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, - metaJob.getJob()); - } else { - LOG.debug("Unable to reserve compaction job for {} {}, returning empty job to compactor {}", - groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); - result = new TExternalCompactionJob(); - } - - return new TNextCompactionJob(result, compactorCounts.get(groupName)); - }); - - // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() - // Async send back to the compactor when a new job is ready - // Need the unused var for errorprone - var unused = future.thenAccept(ecj -> { - LOG.debug("Received next compaction job {}", ecj); - resultHandler.onComplete(ecj); - }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { - if (e instanceof TimeoutException) { - LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); - resultHandler.onComplete( - new TNextCompactionJob(new TExternalCompactionJob(), compactorCounts.get(groupName))); - } else { - LOG.warn("Received exception processing compaction job {}", e.getMessage()); - LOG.debug(e.getMessage(), e); - resultHandler.onError(new RuntimeException(e)); - } - return null; - }); + public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactorAddress, String externalCompactionId) throws TException { + final CompletableFuture future = new CompletableFuture<>(); + asyncThriftService.getCompactionJob(tinfo, credentials, groupName, compactorAddress, + externalCompactionId, asyncMethodFuture(future)); + return getFutureThriftResult(future); } @VisibleForTesting @@ -676,10 +621,143 @@ public void addJobs(TabletMetadata tabletMetadata, Collection job jobQueues.add(tabletMetadata, jobs); } - public CompactionCoordinatorService.AsyncIface getThriftService() { + public CompactionCoordinatorService.Iface getThriftService() { return this; } + public CompactionCoordinatorService.AsyncIface getAsyncThriftService() { + return asyncThriftService; + } + + private CompactionCoordinatorService.AsyncIface newAsyncThriftService() { + return new AsyncIface() { + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.compactionCompleted(tinfo, credentials, externalCompactionId, + textent, stats); + resultHandler.onComplete(null); + } + + @Override + public void getCompactionJob(TInfo tinfo, TCredentials credentials, String groupName, + String compactorAddress, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + // Get the next job as a future as we need to wait until something is available + CompactorGroupId groupId = CompactorGroupId.of(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, + compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); + + var future = jobQueues.getAsync(groupId).thenApply(metaJob -> { + LOG.trace("Next metaJob is ready {}", metaJob.getJob()); + Optional compactionConfig = getCompactionConfig(metaJob); + + // this method may reread the metadata, do not use the metadata in metaJob for anything + // after + // this method + CompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled + // the + // config is deleted. + var cid = ExternalCompactionId.from(externalCompactionId); + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, cid); + } + + final TExternalCompactionJob result; + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this + // request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, groupName)); + TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, + metaJob.getJob()); + } else { + LOG.debug( + "Unable to reserve compaction job for {} {}, returning empty job to compactor {}", + groupName, metaJob.getTabletMetadata().getExtent(), compactorAddress); + result = new TExternalCompactionJob(); + } + + return new TNextCompactionJob(result, compactorCounts.get(groupName)); + }); + + // TODO: Use a thread pool and use thenAcceptAsync and exceptionallyAsync() + // Async send back to the compactor when a new job is ready + // Need the unused var for errorprone + var unused = future.thenAccept(ecj -> { + LOG.debug("Received next compaction job {}", ecj); + resultHandler.onComplete(ecj); + }).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { + if (e instanceof TimeoutException) { + LOG.trace("Compaction job request with ecid {} timed out.", externalCompactionId); + resultHandler.onComplete(new TNextCompactionJob(new TExternalCompactionJob(), + compactorCounts.get(groupName))); + } else { + LOG.warn("Received exception processing compaction job {}", e.getMessage()); + LOG.debug(e.getMessage(), e); + resultHandler.onError(new RuntimeException(e)); + } + return null; + }); + } + + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.updateCompactionStatus(tinfo, credentials, externalCompactionId, + update, timestamp); + resultHandler.onComplete(null); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent extent, AsyncMethodCallback resultHandler) + throws TException { + CompactionCoordinator.this.compactionFailed(tinfo, credentials, externalCompactionId, + extent); + resultHandler.onComplete(null); + } + + @Override + public void getRunningCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback resultHandler) throws TException { + resultHandler + .onComplete(CompactionCoordinator.this.getRunningCompactions(tinfo, credentials)); + } + + @Override + public void getCompletedCompactions(TInfo tinfo, TCredentials credentials, + AsyncMethodCallback resultHandler) throws TException { + resultHandler + .onComplete(CompactionCoordinator.this.getCompletedCompactions(tinfo, credentials)); + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId, + AsyncMethodCallback resultHandler) throws TException { + CompactionCoordinator.this.cancel(tinfo, credentials, externalCompactionId); + resultHandler.onComplete(null); + } + }; + } + private Optional getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { if (metaJob.getJob().getKind() == CompactionKind.USER && metaJob.getTabletMetadata().getSelectedFiles() != null) { @@ -725,8 +803,8 @@ private Optional getCompactionConfig(CompactionJobQueues.MetaJ */ @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TKeyExtent textent, TCompactionStats stats, - AsyncMethodCallback resultHandler) throws ThriftSecurityException { + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -738,7 +816,6 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, while (localFates == null) { UtilWaitThread.sleep(100); if (shutdown.getCount() == 0) { - resultHandler.onComplete(null); return; } localFates = fateInstances.get(); @@ -765,12 +842,10 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, tableState); // cleanup metadata table and files related to the compaction compactionsFailed(Map.of(ecid, extent)); - resultHandler.onComplete(null); return; } if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { - resultHandler.onComplete(null); return; } @@ -782,12 +857,11 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} {}", ecid, fateId), () -> LOG.debug("compaction commit already initiated for {}", ecid)); - resultHandler.onComplete(null); } @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent, AsyncMethodCallback resultHandler) throws ThriftSecurityException { + TKeyExtent extent) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -797,7 +871,6 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); final var ecid = ExternalCompactionId.of(externalCompactionId); compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); - resultHandler.onComplete(null); } void compactionsFailed(Map compactions) { @@ -913,8 +986,8 @@ public boolean test(TabletMetadata tabletMetadata) { */ @Override public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TCompactionStatusUpdate update, long timestamp, - AsyncMethodCallback resultHandler) throws ThriftSecurityException { + String externalCompactionId, TCompactionStatusUpdate update, long timestamp) + throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -926,7 +999,6 @@ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, if (null != rc) { rc.addUpdate(timestamp, update); } - resultHandler.onComplete(null); } public void recordCompletion(ExternalCompactionId ecid) { @@ -946,16 +1018,16 @@ protected Set readExternalCompactionIds() { } /** - * Return information about running compactions Sends back map of ECID to TExternalCompaction - * objects + * Return information about running compactions * * @param tinfo trace info * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects * @throws ThriftSecurityException permission error */ @Override - public void getRunningCompactions(TInfo tinfo, TCredentials credentials, - AsyncMethodCallback callback) throws ThriftSecurityException { + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -971,20 +1043,20 @@ public void getRunningCompactions(TInfo tinfo, TCredentials credentials, trc.setJob(rc.getJob()); result.putToCompactions(ecid.canonical(), trc); }); - callback.onComplete(result); + return result; } /** - * Return information about recently completed compactions send back map of ECID to - * TExternalCompaction objects + * Return information about recently completed compactions * * @param tinfo trace info * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects * @throws ThriftSecurityException permission error */ @Override - public void getCompletedCompactions(TInfo tinfo, TCredentials credentials, - AsyncMethodCallback callback) throws ThriftSecurityException { + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), @@ -999,13 +1071,12 @@ public void getCompletedCompactions(TInfo tinfo, TCredentials credentials, trc.setUpdates(rc.getUpdates()); result.putToCompactions(ecid.canonical(), trc); }); - callback.onComplete(result); + return result; } @Override - public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId, - AsyncMethodCallback callback) throws TException { - + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); try { @@ -1020,7 +1091,6 @@ public void cancel(TInfo tinfo, TCredentials credentials, String externalCompact } cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); - callback.onComplete(null); } /* Method exists to be called from test */ diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 299a69a2bf9..e4b0a3454de 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -164,12 +164,12 @@ protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecut @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, - String externalCompactionId, TKeyExtent textent, TCompactionStats stats, - AsyncMethodCallback callback) throws ThriftSecurityException {} + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException {} @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent, AsyncMethodCallback callback) throws ThriftSecurityException {} + TKeyExtent extent) throws ThriftSecurityException {} void setMetadataCompactionIds(Set mci) { metadataCompactionIds = mci; @@ -370,8 +370,8 @@ public void testGetCompactionJob() throws Exception { // Get the next job ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); CompletableFuture future = new CompletableFuture<>(); - coordinator.getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), "localhost:10241", - eci.toString(), new AsyncMethodCallback<>() { + coordinator.getAsyncThriftService().getCompactionJob(new TInfo(), creds, GROUP_ID.toString(), + "localhost:10241", eci.toString(), new AsyncMethodCallback<>() { @Override public void onComplete(TNextCompactionJob response) { future.complete(response); @@ -429,8 +429,9 @@ public void testGetCompactionJobNoJobs() throws Exception { var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); CompletableFuture future = new CompletableFuture<>(); - coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, GROUP_ID.toString(), - "localhost:10240", UUID.randomUUID().toString(), new AsyncMethodCallback<>() { + coordinator.getAsyncThriftService().getCompactionJob(TraceUtil.traceInfo(), creds, + GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString(), + new AsyncMethodCallback<>() { @Override public void onComplete(TNextCompactionJob response) { future.complete(response);