Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert CompactionCoordinator RPC service to Async Thrift #4931

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ public enum Property {
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_ASYNC_CLIENTPORT("manager.port.async.client", "8999", PropertyType.PORT,
"The port used for handling client connections on the manager for async services.", "4.0.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -1165,7 +1167,11 @@ public enum Property {
@Experimental
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.", "2.1.0");
"The interval at which to check the tservers for external compactions.", "2.1.0"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed some of the existing props were unused when looking at this PR and opened #4932

COMPACTION_COORDINATOR_MAX_JOB_REQUEST_WAIT_TIME(
"compaction.coordinator.wait.time.job.request.max", "2m", PropertyType.TIMEDURATION,
"The maximum amount of time the coordinator will wait for a requested job from the job queue.",
"4.0.0");

private final String key;
private final String defaultValue;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.rpc;

import static org.apache.accumulo.core.rpc.clients.ThriftClientTypes.COORDINATOR;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;

import java.io.IOException;
Expand Down Expand Up @@ -113,6 +114,15 @@ public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type,
HostAndPort address, ClientContext context) throws TTransportException {
TTransport transport = context.getTransportPool().getTransport(type, address,
context.getClientTimeoutInMillis(), context, true);

// TODO - This is temporary until we support Async multiplexing
// THRIFT-2427 is tracking this issue and the plan is to reopen a new PR
// to add support in the next version. Once we support multiplexing we can
// remove this special case.
if (type == COORDINATOR) {
return type.getClientFactory().getClient(protocolFactory.getProtocol(transport));
}

return createClient(type, transport);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.net.Socket;
import java.nio.channels.SelectionKey;

import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
Expand Down Expand Up @@ -99,11 +102,11 @@ public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport)
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey, final AbstractSelectThread selectThread)
throws TTransportException {
if (processorFactory_.isAsyncProcessor()) {
throw new IllegalStateException("This implementation does not support AsyncProcessors");
}

return new CustomFrameBuffer(trans, selectionKey, selectThread);
TProcessor processor = processorFactory_.getProcessor(null);
return processor instanceof TAsyncProcessor
? new CustomAsyncFrameBuffer(trans, selectionKey, selectThread)
: new CustomFrameBuffer(trans, selectionKey, selectThread);
}
}

Expand All @@ -118,7 +121,7 @@ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) throws TTransportException {
super(trans, selectionKey, selectThread);
// Store the clientAddress in the buffer so it can be referenced for logging during read/write
this.clientAddress = getClientAddress();
this.clientAddress = getClientAddress(trans_, "CustomFrameBuffer");
}

@Override
Expand Down Expand Up @@ -148,33 +151,78 @@ public boolean write() {
}
return result;
}
}

/**
* Custom wrapper around
* {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} to extract the
* client's network location before accepting the request.
*/
private class CustomAsyncFrameBuffer extends AsyncFrameBuffer {
private final String clientAddress;

public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
AbstractSelectThread selectThread) throws TTransportException {
super(trans, selectionKey, selectThread);
// Store the clientAddress in the buffer so it can be referenced for logging during read/write
this.clientAddress = getClientAddress(trans_, "CustomAsyncFrameBuffer");
}

@Override
public void invoke() {
// On invoke() set the clientAddress on the ThreadLocal so that it can be accessed elsewhere
// in the same thread that called invoke() on the buffer
TServerUtils.clientAddress.set(clientAddress);
super.invoke();
}

@Override
public boolean read() {
boolean result = super.read();
if (!result) {
log.trace("CustomAsyncFrameBuffer.read returned false when reading data from client: {}",
clientAddress);
}
return result;
}

/*
* Helper method used to capture the client address inside the CustomFrameBuffer constructor so
* that it can be referenced inside the read/write methods for logging purposes. It previously
* was only set on the ThreadLocal in the invoke() method but that does not work because A) the
* method isn't called until after reading is finished so the value will be null inside of
* read() and B) The other problem is that invoke() is called on a different thread than
* read()/write() so even if the order was correct it would not be available.
*
* Since a new FrameBuffer is created for each request we can use it to capture the client
* address earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read
* data and write a response back to the client and as part of creation of the buffer the
* TNonblockingSocket is stored as a final variable and won't change so we can safely capture
* the clientAddress in the constructor and use it for logging during read/write and then use
* the value inside of invoke() to set the ThreadLocal so the client address will still be
* available on the thread that called invoke().
*/
private String getClientAddress() {
String clientAddress = null;
if (trans_ instanceof TNonblockingSocket) {
TNonblockingSocket tsock = (TNonblockingSocket) trans_;
Socket sock = tsock.getSocketChannel().socket();
clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort();
log.trace("CustomFrameBuffer captured client address: {}", clientAddress);
@Override
public boolean write() {
boolean result = super.write();
if (!result) {
log.trace("CustomAsyncFrameBuffer.write returned false when writing data to client: {}",
clientAddress);
}
return clientAddress;
return result;
}
}

/*
* Helper method used to capture the client address inside the CustomFrameBuffer and
* CustomAsyncFrameBuffer constructors so that it can be referenced inside the read/write methods
* for logging purposes. It previously was only set on the ThreadLocal in the invoke() method but
* that does not work because A) the method isn't called until after reading is finished so the
* value will be null inside of read() and B) The other problem is that invoke() is called on a
* different thread than read()/write() so even if the order was correct it would not be
* available.
*
* Since a new FrameBuffer is created for each request we can use it to capture the client address
* earlier in the constructor and not wait for invoke(). A FrameBuffer is used to read data and
* write a response back to the client and as part of creation of the buffer the
* TNonblockingSocket is stored as a final variable and won't change so we can safely capture the
* clientAddress in the constructor and use it for logging during read/write and then use the
* value inside of invoke() to set the ThreadLocal so the client address will still be available
* on the thread that called invoke().
*/
private static String getClientAddress(TNonblockingTransport transport, String name) {
String clientAddress = null;
if (transport instanceof TNonblockingSocket) {
TNonblockingSocket tsock = (TNonblockingSocket) transport;
Socket sock = tsock.getSocketChannel().socket();
clientAddress = sock.getInetAddress().getHostAddress() + ":" + sock.getPort();
log.trace("{} captured client address: {}", name, clientAddress);
}
return clientAddress;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.rpc.TimedProcessor.AsyncTimedProcessor;
import org.apache.accumulo.server.rpc.TimedProcessor.SyncTimedProcessor;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
Expand Down Expand Up @@ -170,7 +173,7 @@ public static ServerAddress startServer(ServerContext context, String hostname,
// create the TimedProcessor outside the port search loop so we don't try to
// register the same
// metrics mbean more than once
TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo());
TimedProcessor timedProcessor = newTimedProcessor(processor, context.getMetricsInfo());

HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
Expand Down Expand Up @@ -574,7 +577,7 @@ public static ServerAddress startTServer(final AccumuloConfiguration conf,
}

try {
return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName,
return startTServer(serverType, newTimedProcessor(processor, metricsInfo), serverName,
threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize,
sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses);
} catch (TTransportException e) {
Expand Down Expand Up @@ -702,4 +705,10 @@ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProc

return new UGIAssumingProcessor(processor);
}

private static TimedProcessor newTimedProcessor(TProcessor processor, MetricsInfo metricsInfo) {
return processor instanceof TAsyncProcessor
? new AsyncTimedProcessor((TAsyncProcessor) processor, metricsInfo)
: new SyncTimedProcessor(processor, metricsInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.AsyncProcessor;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.manager.thrift.FateService;
Expand All @@ -32,7 +33,6 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.thrift.TBaseProcessor;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TServiceClient;
Expand All @@ -47,8 +47,8 @@ public ThriftProcessorTypes(ThriftClientTypes<C> type) {
}

@VisibleForTesting
public <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, ServerContext context) {
public <I,H extends I,P extends TProcessor> P getTProcessor(Class<P> processorClass,
Class<I> interfaceClass, H serviceHandler, ServerContext context) {
I rpcProxy = TraceUtil.wrapService(serviceHandler);
if (context.getThriftServerType() == ThriftServerType.SASL) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -114,20 +114,26 @@ public static TMultiplexedProcessor getGcTProcessor(GCMonitorService.Iface servi
}

public static TMultiplexedProcessor getManagerTProcessor(FateService.Iface fateServiceHandler,
CompactionCoordinatorService.Iface coordinatorServiceHandler,
ManagerClientService.Iface managerServiceHandler, ServerContext context) {
TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
muxProcessor.registerProcessor(FATE.getServiceName(), FATE.getTProcessor(
FateService.Processor.class, FateService.Iface.class, fateServiceHandler, context));
muxProcessor.registerProcessor(COORDINATOR.getServiceName(),
COORDINATOR.getTProcessor(CompactionCoordinatorService.Processor.class,
CompactionCoordinatorService.Iface.class, coordinatorServiceHandler, context));
muxProcessor.registerProcessor(MANAGER.getServiceName(),
MANAGER.getTProcessor(ManagerClientService.Processor.class,
ManagerClientService.Iface.class, managerServiceHandler, context));
return muxProcessor;
}

public static AsyncProcessor<?> getManagerTAsyncProcessor(
CompactionCoordinatorService.AsyncIface coordinatorServiceHandler, ServerContext context) {
// TODO - Right now this is temporarily returning the single AsyncProcessor
// Once Thrift supports Async multiplexing then we can switch to using
// TMultiplexedAsyncProcessor and use multiplexing like we do for sync processors
// THRIFT-2427 is tracking this issue
return COORDINATOR.getTProcessor(CompactionCoordinatorService.AsyncProcessor.class,
CompactionCoordinatorService.AsyncIface.class, coordinatorServiceHandler, context);
}

public static TMultiplexedProcessor getScanServerTProcessor(ClientServiceHandler clientHandler,
TabletScanClientService.Iface tserverHandler, ServerContext context) {
TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@

import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.server.metrics.ThriftMetrics;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer;

import com.google.common.base.Preconditions;

/**
* A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem.
*/
public class TimedProcessor implements TProcessor {
public abstract class TimedProcessor implements TProcessor {

private final TProcessor other;
private final ThriftMetrics thriftMetrics;
private long idleStart;
protected final TProcessor other;
protected final ThriftMetrics thriftMetrics;
protected long idleStart;

public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
protected TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
this.other = next;
thriftMetrics = new ThriftMetrics();
metricsInfo.addMetricsProducers(thriftMetrics);
Expand All @@ -44,14 +48,57 @@ public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {

@Override
public void process(TProtocol in, TProtocol out) throws TException {
time(() -> other.process(in, out));
}

public boolean isAsync() {
return false;
}

protected void time(ThriftRunnable runnable) throws TException {
long processStart = System.nanoTime();
thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart));
try {
other.process(in, out);
runnable.run();
} finally {
// set idle to now, calc time in process
idleStart = System.nanoTime();
thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart));
}
}

public static class SyncTimedProcessor extends TimedProcessor {
protected SyncTimedProcessor(TProcessor next, MetricsInfo metricsInfo) {
super(next, metricsInfo);
}
}

public static class AsyncTimedProcessor extends TimedProcessor implements TAsyncProcessor {
private final TAsyncProcessor other;

public AsyncTimedProcessor(TAsyncProcessor next, MetricsInfo metricsInfo) {
super(validate(next), metricsInfo);
this.other = next;
}

@Override
public boolean isAsync() {
return true;
}

@Override
public void process(AsyncFrameBuffer fb) throws TException {
this.other.process(fb);
}

private static TProcessor validate(TAsyncProcessor other) {
Preconditions.checkArgument(other instanceof TProcessor,
"Async processor must also implement TProcessor");
return (TProcessor) other;
}
}

protected interface ThriftRunnable {
void run() throws TException;
}
}
Loading