-
Notifications
You must be signed in to change notification settings - Fork 450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert CompactionCoordinator RPC service to Async Thrift #4931
Conversation
This changes the CompactionCoordinator RPC service to use an Async thrift processor. This allows long polling when a compactor requests a new job by no longer blocking the Thrift IO thread when there are no jobs available. When a job is available, the response will be sent asynchronously back to the client. The only RPC service converted for now is the CompactionCoordinator RPC service but future services could be converted to use Async processors as well. This RPC is not using multiplexing as Async multiplexing is an open issue being worked on in THRIFT-2427. Once that issue is resolve the plan will be to convert to an async multiplexed processor. A second thrift service on another port was created in the Manager for handling async processors because they are handled differently than sync and you can't combine both sync and async processors when multiplexing. The coordinator rpc service now advertises the async port so clients connect to the correct service. This closes apache#4664
The sunny ITs passed, I just kicked off a full IT build as well. |
I pushed an update to fix a bug that I just restarted the full IT build. |
@@ -1165,7 +1167,11 @@ public enum Property { | |||
@Experimental | |||
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( | |||
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, | |||
"The interval at which to check the tservers for external compactions.", "2.1.0"); | |||
"The interval at which to check the tservers for external compactions.", "2.1.0"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed some of the existing props were unused when looking at this PR and opened #4932
var unused = future.thenAccept(ecj -> { | ||
LOG.debug("Received next compaction job {}", ecj); | ||
resultHandler.onComplete(ecj); | ||
}).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no exceptionallyAsync, but there is a whenCompleteAsync method that could be used.
The following is just sharing some experiments i did trying to understand the timeout a bit better, not recommending any changes for this PR.
The following little program
var executor = Executors.newFixedThreadPool(4);
var future = CompletableFuture.supplyAsync(()->{
System.out.println("P1 "+Thread.currentThread().getName());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
System.out.println("Saw "+e.getMessage());
}
return 5;
}, executor).orTimeout(1, TimeUnit.SECONDS).exceptionally(throwable -> {
System.out.println("P2 "+Thread.currentThread().getName()+" "+throwable.getMessage());
return 6;
});
System.out.println(future.get());
Thread.sleep(3);
System.out.println("Shutting down");
executor.shutdownNow();
prints
P1 pool-1-thread-1
P2 CompletableFutureDelayScheduler null
6
Shutting down
Saw sleep interrupted
This shows two problems, first the execeptionally code will run in a system wide shared thread pool because of the timeout and second that the original task that is sleeping 60 seconds is not interrupted when the timeout happens. It is only interrupted when the executor is shutdown. Using whenCompleteAsync will avoid the first problem. So when things timeout using this approach, we could have threads hanging around working on stuff that no one cares about. Looking into this I found this stack overflow post. Looking at that I wrote the following test
public class CFT {
public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
ExecutorService executor, ScheduledExecutorService schedulerExecutor) {
final CompletableFuture<T> cf = new CompletableFuture<T>();
Future<?> future = executor.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
//schedule watcher
schedulerExecutor.schedule(() -> {
// There was a race condition in the stack overflow post that was fixed here, also completing exceptionally
if(cf.completeExceptionally(new TimeoutException("Your are outta time"))) {
future.cancel(true);
}
}, timeoutValue, timeUnit);
return cf;
}
public static void main(String[] args) throws Exception{
var executor = Executors.newFixedThreadPool(4);
var schedExecutor = Executors.newScheduledThreadPool(1);
Supplier<Integer> supplier = ()->{
System.out.println("P1 "+Thread.currentThread().getName());
try {
Thread.sleep(60000);
System.out.println("done");
} catch (InterruptedException e) {
System.out.println("Saw "+e.getMessage());
}
return 5;
};
var future = supplyAsync(supplier, 1, TimeUnit.SECONDS, executor,schedExecutor);
// The execeptionally method does not have an async version, so called whenComplete instead. This avoids
// executing code in the scheduled executor.
future.whenCompleteAsync((i,throwable) -> {
System.out.println("P2 "+Thread.currentThread().getName()+" "+throwable.getMessage());
}, executor);
// give the timeout a chance to happen
Thread.sleep(3000);
System.out.println("Shutting down");
executor.shutdown();
schedExecutor.shutdown();
}
}
This prints the following which shows the original sleeping task is interrupted and that the exectional handling runs in the desired thread pool.
P1 pool-1-thread-1
Saw sleep interrupted
P2 pool-1-thread-2 Your are outta time
Shutting down
Based on this probably need to open a follow on issue after this PR to improve the timeout case to interrupt any work that is out there running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no exceptionallyAsync, but there is a whenCompleteAsync method that could be used.
There is exceptionallyAsync but it turns out that it is only available in JDK 17.
Version 4.0.0 should be requiring JDK 17 anyways by the time we release it so I think it might be time to bring up bumping the version again. I know there are/were Hadoop issues but I've been running with JDK 17 for a while and haven't seen problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when things timeout using this approach, we could have threads hanging around working on stuff that no one cares about.
Does this apply for this use case? What thread would be hanging around on timeout that we need to interrupt?
For this case we are not submitting any work to be done to the task executor to supply the completable future. We are just waiting for the future to be completed by another thread (i looked at the CompactionJobPriortyQueue and it looks like it just calls complete with a new job when it's available.) which should be independent. When the future completes the executor would use a new thread to execute the thenApplyAsync() method or whenCompleteAsync() method.
But I also might be thinking about this wrong or missing something that we would need to to interrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this apply for this use case? What thread would be hanging around on timeout that we need to interrupt?
Think the code is doing something similar to the following. A thread could be hanging out trying to make the metadata table update after a timeout has happened when nothing cares about that update being made anymore.
.thenAccept(//reserve in metedata table).thenAccept(//do rpc response).onTimeout()
There is exceptionallyAsync but it turns out that it is only available in JDK 17.
Ah, another reason to go to JDK17. Would so love to have records. AFAIK the problem w/ JDK17 is that map reduce does not work w/ JDK17. It would be tricky to transition to JDK17 because the accumulo map reduce code uses a lot of code in core.
I took a look at the full IT build and it's still running 22 hours later with a ton of timeouts. I took a look at one of the tests to see what was going on (GarbageCollectorIT#gcRun) and I noticed that the Compactor had finished compacting a job but the Coordinator was spitting out in a loop waiting for the Fate job to be finished and was not notified. Eventually I found the following in the Compactor log:
I ran again and the compactor thread dump shows the same thing. I do not know yet if this is the same issue for all the tests hanging or not as I have to investigate but clearly something isn't quite right. What's weird is for the test it ran 1 compaction and then the second one it gets stuck on trying to call that RPC. |
I should add that my first hunch is that I probably missed calling |
I fixed a copule tests that failed because the callack handler wasn't being called so no response was sent back. However, when investigating why the Kerberos ITs were failing, I discovered they were hanging forever when compacting. After debugging for a while I found out that the SSL and SASL servers are do not support async and we are using a blocking server when that is configured. This is documented here and I verified in the code. Also the server can be started using a blocking server even if not using SASL or SSL but non-blocking is the default. So that is obviously a problem, if we are using a blocking server instead of non-blocking then the async processors will not work. To get around this, I pushed an update that detects which mode is used for Thrift and conditionally will start the compaction cordinator service in either async mode if we are using a non blocking server or fallback to sync mode if we are not. It would be nice to actually be able to use non-blocking for all modes but I'm not sure if that is possible. I know that GRPC works with SSL because it uses Netty which is non-blocking so that's one benefit of GRPC. @ctubbsii - Do you know if Thrift still does not support SSL or SASL with a non-blocking server? I just kicked off a new full IT build to see what is still broken. I'm going to also convert this to a draft PR until we work through some of these issues such as what to do about SSL and SASL being blocking (maybe we just don't support async for now with that), async multiplexing, etc. |
@keith-turner just pointed out to me that we need to make sure the sync version of the compaction coordinator service doesn't use the async version because that will long poll which we do not want, so when I get time i will revert the sync version back to the old way so we don't long poll. I will need to do a little refactoring so some of the same code can be shared between both versions. |
The full IT got stuck again, it keeps hanging forever on |
I'm closing this PR for now because similarly to some limitations discovered in #4715 that put gRPC on hold, there are several issues with Async Thrift that prevent this from being a solution right now. I plan to open up a new issue for future investigation into async RPC that will include the findings from both gRPC and async thrift but some of the quick highlights of the async thrift issues include:
|
This changes the CompactionCoordinator RPC service to use an Async thrift processor. This allows long polling when a compactor requests a new job by no longer blocking the Thrift IO thread when there are no jobs available. When a job is available, the response will be sent asynchronously back to the client.
The only RPC service converted for now is the CompactionCoordinator RPC service but future services could be converted to use Async processors as well. This RPC is not using multiplexing as Async multiplexing is an open issue being worked on in THRIFT-2427. Once that issue is resolve the plan will be to convert to an async multiplexed processor.
A second thrift service on another port was created in the Manager for handling async processors because they are handled differently than sync and you can't combine both sync and async processors when multiplexing. The coordinator rpc service now advertises the async port so clients connect to the correct service.
With thrift you can technically implement both sync and async interfaces on the processor and register them for each but that over complicates things and doesn't make much sense. The simplest thing is to make sure to only support one type for a specific Thrift service. So in this case, even though we only really care about 1 method being async (getCompactionJob) for the coordinator service, all the other methods were converted and we just now only implement the Async api because it's easy to still make things sync when using the Async api if desired. The good news is it's also easy to support both sync processors and async processors on the same server with two different ports as this PR does. Going forward, it might make sense to convert more services over to implement the Async interface/API.
This closes #4664
A couple other things to note about the PR: