Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert CompactionCoordinator RPC service to Async Thrift #4931

Closed
wants to merge 5 commits into from

Conversation

cshannon
Copy link
Contributor

@cshannon cshannon commented Sep 28, 2024

This changes the CompactionCoordinator RPC service to use an Async thrift processor. This allows long polling when a compactor requests a new job by no longer blocking the Thrift IO thread when there are no jobs available. When a job is available, the response will be sent asynchronously back to the client.

The only RPC service converted for now is the CompactionCoordinator RPC service but future services could be converted to use Async processors as well. This RPC is not using multiplexing as Async multiplexing is an open issue being worked on in THRIFT-2427. Once that issue is resolve the plan will be to convert to an async multiplexed processor.

A second thrift service on another port was created in the Manager for handling async processors because they are handled differently than sync and you can't combine both sync and async processors when multiplexing. The coordinator rpc service now advertises the async port so clients connect to the correct service.

With thrift you can technically implement both sync and async interfaces on the processor and register them for each but that over complicates things and doesn't make much sense. The simplest thing is to make sure to only support one type for a specific Thrift service. So in this case, even though we only really care about 1 method being async (getCompactionJob) for the coordinator service, all the other methods were converted and we just now only implement the Async api because it's easy to still make things sync when using the Async api if desired. The good news is it's also easy to support both sync processors and async processors on the same server with two different ports as this PR does. Going forward, it might make sense to convert more services over to implement the Async interface/API.

This closes #4664

A couple other things to note about the PR:

  1. As noted, Thrift does not support Async multiplexing but there is an old issue to track that and a PR that was closed. I was able to get that PR to work in local testing with async multiplexing with a few tweaks. I plan to open up a new PR soon so we can try and get async multiplexing into the next Thrift release.
  2. I added a couple TODOs in the code. Some of them mark the spots where I had to add temporary code to get around not multiplexing the new service. There's also a TODO because we probably should change the handling of the job completable future to complete using the async methods and another thread pool but i wanted some feedback on that first.
  3. The existing tests should pretty much already provide coverage on everything. I also modified the CompactionCoordinatorTest to set a shorter timeout period before the job future times out when there are no jobs. This is both to make the test go faster and also to validate that property and the timeout works.
  4. One nice thing I noticed about the Async API is that you don't need to actually do a bunch of boiler plate code for setting an error response on the async callback unless you want to. The API already catches exceptions that are thrown and does it for you here which is nice.
  5. If the job future times out the coordinator should not need to do anything else but send back the empty job response. The job queue already has code to periodically clean up completed futures.

This changes the CompactionCoordinator RPC service to use an Async
thrift processor. This allows long polling when a compactor requests a
new job by no longer blocking the Thrift IO thread when there are no
jobs available. When a job is available, the response will be sent
asynchronously back to the client.

The only RPC service converted for now is the CompactionCoordinator RPC
service but future services could be converted to use Async processors
as well. This RPC is not using multiplexing as Async multiplexing is an
open issue being worked on in THRIFT-2427. Once that issue is resolve
the plan will be to convert to an async multiplexed processor.

A second thrift service on another port was created in the Manager
for handling async processors because they are handled differently
than sync and you can't combine both sync and async processors
when multiplexing. The coordinator rpc service now advertises the async
port so clients connect to the correct service.

This closes apache#4664
@cshannon cshannon added this to the 4.0.0 milestone Sep 28, 2024
@cshannon cshannon self-assigned this Sep 28, 2024
@cshannon
Copy link
Contributor Author

The sunny ITs passed, I just kicked off a full IT build as well.

@cshannon
Copy link
Contributor Author

cshannon commented Sep 28, 2024

I pushed an update to fix a bug that ThriftMaxFrameSizeIT discovered. The TimedProcessor was implementing TAsyncProcessor which caused the wrong frame to be used in some cases. We may want to update that max frame size test to verify it works with the Async frames as well.

I just restarted the full IT build.

@@ -1165,7 +1167,11 @@ public enum Property {
@Experimental
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.", "2.1.0");
"The interval at which to check the tservers for external compactions.", "2.1.0"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed some of the existing props were unused when looking at this PR and opened #4932

var unused = future.thenAccept(ecj -> {
LOG.debug("Received next compaction job {}", ecj);
resultHandler.onComplete(ecj);
}).orTimeout(maxJobRequestWaitTime, MILLISECONDS).exceptionally(e -> {
Copy link
Contributor

@keith-turner keith-turner Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no exceptionallyAsync, but there is a whenCompleteAsync method that could be used.

The following is just sharing some experiments i did trying to understand the timeout a bit better, not recommending any changes for this PR.

The following little program

        var executor = Executors.newFixedThreadPool(4);

        var future = CompletableFuture.supplyAsync(()->{
            System.out.println("P1 "+Thread.currentThread().getName());
            try {
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                System.out.println("Saw "+e.getMessage());
            }
            return 5;
        }, executor).orTimeout(1, TimeUnit.SECONDS).exceptionally(throwable -> {
            System.out.println("P2 "+Thread.currentThread().getName()+" "+throwable.getMessage());
            return 6;
        });


        System.out.println(future.get());
        Thread.sleep(3);
        System.out.println("Shutting down");
        executor.shutdownNow();

prints

P1 pool-1-thread-1
P2 CompletableFutureDelayScheduler null
6
Shutting down
Saw sleep interrupted

This shows two problems, first the execeptionally code will run in a system wide shared thread pool because of the timeout and second that the original task that is sleeping 60 seconds is not interrupted when the timeout happens. It is only interrupted when the executor is shutdown. Using whenCompleteAsync will avoid the first problem. So when things timeout using this approach, we could have threads hanging around working on stuff that no one cares about. Looking into this I found this stack overflow post. Looking at that I wrote the following test

public class CFT {

    public static <T> CompletableFuture<T> supplyAsync(
            final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
            ExecutorService executor, ScheduledExecutorService schedulerExecutor) {

        final CompletableFuture<T> cf = new CompletableFuture<T>();

        Future<?> future = executor.submit(() -> {
            try {
                cf.complete(supplier.get());
            } catch (Throwable ex) {
                cf.completeExceptionally(ex);
            }
        });

        //schedule watcher
        schedulerExecutor.schedule(() -> {
            // There was a race condition in the stack overflow post that was fixed here, also completing exceptionally
               if(cf.completeExceptionally(new TimeoutException("Your are outta time"))) {
                    future.cancel(true);
                }
        }, timeoutValue, timeUnit);

        return cf;
    }


    public static void main(String[] args) throws Exception{
        var executor = Executors.newFixedThreadPool(4);
        var schedExecutor = Executors.newScheduledThreadPool(1);

        Supplier<Integer> supplier = ()->{
            System.out.println("P1 "+Thread.currentThread().getName());
            try {
                Thread.sleep(60000);
                System.out.println("done");
            } catch (InterruptedException e) {
                System.out.println("Saw "+e.getMessage());
            }
            return 5;
        };

        var future = supplyAsync(supplier, 1, TimeUnit.SECONDS, executor,schedExecutor);

        // The execeptionally method does not have an async version, so called whenComplete instead.  This avoids
        // executing code in the scheduled executor.
        future.whenCompleteAsync((i,throwable) -> {
            System.out.println("P2 "+Thread.currentThread().getName()+" "+throwable.getMessage());
        }, executor);


        // give the timeout a chance to happen
        Thread.sleep(3000);

       System.out.println("Shutting down");
        executor.shutdown();
        schedExecutor.shutdown();
    }
}

This prints the following which shows the original sleeping task is interrupted and that the exectional handling runs in the desired thread pool.

P1 pool-1-thread-1
Saw sleep interrupted
P2 pool-1-thread-2 Your are outta time
Shutting down

Based on this probably need to open a follow on issue after this PR to improve the timeout case to interrupt any work that is out there running.

Copy link
Contributor Author

@cshannon cshannon Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no exceptionallyAsync, but there is a whenCompleteAsync method that could be used.

There is exceptionallyAsync but it turns out that it is only available in JDK 17.

Version 4.0.0 should be requiring JDK 17 anyways by the time we release it so I think it might be time to bring up bumping the version again. I know there are/were Hadoop issues but I've been running with JDK 17 for a while and haven't seen problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when things timeout using this approach, we could have threads hanging around working on stuff that no one cares about.

Does this apply for this use case? What thread would be hanging around on timeout that we need to interrupt?

For this case we are not submitting any work to be done to the task executor to supply the completable future. We are just waiting for the future to be completed by another thread (i looked at the CompactionJobPriortyQueue and it looks like it just calls complete with a new job when it's available.) which should be independent. When the future completes the executor would use a new thread to execute the thenApplyAsync() method or whenCompleteAsync() method.

But I also might be thinking about this wrong or missing something that we would need to to interrupt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this apply for this use case? What thread would be hanging around on timeout that we need to interrupt?

Think the code is doing something similar to the following. A thread could be hanging out trying to make the metadata table update after a timeout has happened when nothing cares about that update being made anymore.

.thenAccept(//reserve in metedata table).thenAccept(//do rpc response).onTimeout()

There is exceptionallyAsync but it turns out that it is only available in JDK 17.

Ah, another reason to go to JDK17. Would so love to have records. AFAIK the problem w/ JDK17 is that map reduce does not work w/ JDK17. It would be tricky to transition to JDK17 because the accumulo map reduce code uses a lot of code in core.

@cshannon
Copy link
Contributor Author

cshannon commented Sep 29, 2024

I took a look at the full IT build and it's still running 22 hours later with a ton of timeouts. I took a look at one of the tests to see what was going on (GarbageCollectorIT#gcRun) and I noticed that the Compactor had finished compacting a job but the Coordinator was spitting out in a loop waiting for the Fate job to be finished and was not notified. Eventually I found the following in the Compactor log:

2024-09-29T11:53:26,663 40 [compaction.RetryableThriftCall] DEBUG: Error in Thrift function, retrying ...
org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: 120000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/xxxx:36228 remote=ip-xxxx:8999]
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:171) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.transport.TTransport.readAll(TTransport.java:100) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.transport.layered.TFramedTransport.readFrame(TFramedTransport.java:132) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.transport.layered.TFramedTransport.read(TFramedTransport.java:100) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.transport.TTransport.readAll(TTransport.java:100) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:686) ~[classes/:?]
        at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:622) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:479) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79) ~[libthrift-0.17.0.jar:0.17.0]
        at org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService$Client.recv_compactionCompleted(CompactionCoordinatorService.java:109) ~[classes/:?]
        at org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService$Client.compactionCompleted(CompactionCoordinatorService.java:92) ~[classes/:?]
        at org.apache.accumulo.compactor.Compactor.lambda$updateCompactionCompleted$2(Compactor.java:441) ~[classes/:?]
        at org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:99) [classes/:?]
        at org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:448) [classes/:?]
        at org.apache.accumulo.compactor.Compactor.run(Compactor.java:853) [classes/:?]
        at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [classes/:?]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: java.net.SocketTimeoutException: 120000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/xxxx:36228 remote=ip-xxxx:8999]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:163) ~[hadoop-client-api-3.4.0.jar:?]
       ...
        ... 16 more

I ran again and the compactor thread dump shows the same thing. I do not know yet if this is the same issue for all the tests hanging or not as I have to investigate but clearly something isn't quite right. What's weird is for the test it ran 1 compaction and then the second one it gets stuck on trying to call that RPC.

@cshannon
Copy link
Contributor Author

cshannon commented Sep 29, 2024

I should add that my first hunch is that I probably missed calling resultHandler.onComplete() in some code branches so it's probably just stuck and not responding so I'm hoping this is a simple fix.

@cshannon
Copy link
Contributor Author

I fixed a copule tests that failed because the callack handler wasn't being called so no response was sent back.

However, when investigating why the Kerberos ITs were failing, I discovered they were hanging forever when compacting. After debugging for a while I found out that the SSL and SASL servers are do not support async and we are using a blocking server when that is configured. This is documented here and I verified in the code. Also the server can be started using a blocking server even if not using SASL or SSL but non-blocking is the default.

So that is obviously a problem, if we are using a blocking server instead of non-blocking then the async processors will not work. To get around this, I pushed an update that detects which mode is used for Thrift and conditionally will start the compaction cordinator service in either async mode if we are using a non blocking server or fallback to sync mode if we are not.

It would be nice to actually be able to use non-blocking for all modes but I'm not sure if that is possible. I know that GRPC works with SSL because it uses Netty which is non-blocking so that's one benefit of GRPC.

@ctubbsii - Do you know if Thrift still does not support SSL or SASL with a non-blocking server?

I just kicked off a new full IT build to see what is still broken. I'm going to also convert this to a draft PR until we work through some of these issues such as what to do about SSL and SASL being blocking (maybe we just don't support async for now with that), async multiplexing, etc.

@cshannon cshannon marked this pull request as draft September 29, 2024 21:22
@cshannon
Copy link
Contributor Author

@keith-turner just pointed out to me that we need to make sure the sync version of the compaction coordinator service doesn't use the async version because that will long poll which we do not want, so when I get time i will revert the sync version back to the old way so we don't long poll. I will need to do a little refactoring so some of the same code can be shared between both versions.

@cshannon
Copy link
Contributor Author

The full IT got stuck again, it keeps hanging forever on ExternalCompaction_3_IT but I am not sure why yet. The only other tests I saw fail were BackupManagerIT and IdleProcessMetricsIT but these may be related to the new thrift service/port, I am not sure yet. Until ExternalCompaction_3_IT is hard to know what else needs fixing as the build won't finish.

@cshannon cshannon closed this Oct 4, 2024
@cshannon
Copy link
Contributor Author

cshannon commented Oct 4, 2024

I'm closing this PR for now because similarly to some limitations discovered in #4715 that put gRPC on hold, there are several issues with Async Thrift that prevent this from being a solution right now. I plan to open up a new issue for future investigation into async RPC that will include the findings from both gRPC and async thrift but some of the quick highlights of the async thrift issues include:

  1. No async multiplexing support (But there is an old PR that could be re-worked and reopened as part of THRIFT-2427)
  2. No Async SSL support. None of the non-blocking servers support SSL and that is a non-trivial thing to implement and is quite difficult due to the complexity of working with the JDK SSLEngine. (Which is why something like Netty is nice).
  3. No Async SASL support. There was a non-blocking SASL server added in https://issues.apache.org/jira/browse/THRIFT-4889 but it implements its own custom nonblocking extension to the server and doesn't support AsyncFrame or any of the async API so Async services will not work with it currently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decrease latency between time compaction job is added and run
3 participants