-
Notifications
You must be signed in to change notification settings - Fork 451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decrease latency between time compaction job is added and run #4664
Comments
I think long polling here should work. Ideally, being able to do pub/sub and push notifications to a listener is the best way so there isn't any polling but long polling is also viable and most messaging APIs support it. Both Kafka and JMS brokers (just as two examples) support long polling in their API and it works pretty well with similar behavior as here. The calls will wait for messages/records to arrive and then if the timeout passes will return empty/null, etc and you can try again. |
Currently in the manager the same thread pool service fate, compaction coordinator, etc RPCs. So this could cause a problem if its a fixed size thread pool and all of the threads are waiting on an empty compaction queue. This would prevent fate RPCs from running or it could even block a compaction queue that has work (like all threads are waiting on queue A that has no work and compaction queue B has work, but nothing can get to it). One simple way to solve this would be an unbounded thread pool to service manager RPCs. However there is some limit (not sure what it is) where too many threads in a JVM start to cause problems. Another possible solution is to make the server side processing for these messages async. Thrift supports async server processing, but there is no documentation for it. With async processing when an RPC request is waiting for a compaction job there would be no thread associated with it, which would be ideal. Since there are no docs for thrift async servers I have not been able to determine if its possible to mix sync and async processing for RPCs in thrift, because we would probably not want to make everything async in the manager for code complexity reasons. If they can not be mixed, maybe we could create a thrift service with its own port that only services request for compaction jobs. Maybe that could even be switched to using grpc as a trial run of grpc which also seems to support async. If we wait long enough would probably not need async processing for this as java virtual threads could be used instead. Not sure what is the best course of action is here, just posting some notes from researching this a bit. |
Some of the things I found while looking into async thrift https://issues.apache.org/jira/browse/THRIFT-1972 |
i'm researching this now and so far async thrift may not be a great option. Besides the extremely poor documentation (non-existent) I have found some blog posts that make it sound like it's not truly async and that a thread is still used per RPC call. I'm not sure if this is true or not until I dive more into the code as there's literally no information anywhere about async thrift. The lack of information also makes me a bit skeptical about using it because it probably means no one else is and there could be bugs that haven't been worked out. I have a few ideas that I am thinking through and and can list them here when I finish researching. This may end up in a situation where we need a different solution than Thrift for this use case depending on how the async code looks. |
I looked into this quite a bit today to see if async thrift may work. We have a lot of custom code for Thrift and we currently disallow Async thrift processors. The processors are generated but we are not using it anywhere. It would take some work to try things out and I think we would need to stand up another server as there is an open issue that mentions multiplex processors not working. I don't think Async thrift is the best approach to try because of the lack of documentation and support and other issues. I talked offline to @keith-turner a bit about this and it seems like gRPC might be a good alternate to try as a trial as noted in a previous comment. gRPC supports async and streaming RPC so it looks like it would work. A compactor could send in the request and the server could offload and wait to complete the StreamObserver that comes in until there is a new job available. CompleteableFuture could likely be used here to make the code easier to manage (I will let @keith-turner comment on this). By doing this we could handle a lot of requests and they would not be taking up threads while waiting. I also had a few other alternate ideas:
|
If we add the the following to CompactionJobQueues public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
// TODO implement
throw new UnsupportedOperationException();
} then hoping if we can get an async way to handle job request RPC that the following could be done in CompactionCoordinator /**
* Process an RPC from a compactor to get a compaction job asynchronously.
*
* @param callback a RPC callback that will send the compaction job back to the compactor
*/
public void getCompactionJob(String groupName, String compactorAddress,
String externalCompactionId, Consumer<TExternalCompactionJob> callback) {
CompactorGroupId groupId = CompactorGroupId.of(groupName);
CompletableFuture<CompactionJobQueues.MetaJob> future = jobQueues.getAsync(groupId);
// TODO if nothing is done then the reservation and RPC callback will run in the thread that
// completes the future which would probably be a TGW thread. This is easy to remedy w/
// completable future
// TODO need to timeout and return nothing periodically, completablefuture has mechanism for
// this that need investigation to see if workable
future.thenApply(metaJob -> reserve(compactorAddress, externalCompactionId, metaJob))
.thenAccept(callback);
}
// this method comes from copying existing code in CompactionCoordinator to function
private TExternalCompactionJob reserve(String compactorAddress, String externalCompactionId,
CompactionJobQueues.MetaJob metaJob) {
// reserve the compaction job
Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
// this method may reread the metadata, do not use the metadata in metaJob for anything after
// this method
CompactionMetadata ecm = null;
var kind = metaJob.getJob().getKind();
// Only reserve user compactions when the config is present. When compactions are canceled the
// config is deleted.
var cid = ExternalCompactionId.from(externalCompactionId);
if (kind == CompactionKind.SYSTEM
|| (kind == CompactionKind.USER && compactionConfig.isPresent())) {
ecm = reserveCompaction(metaJob, compactorAddress, cid);
}
if (ecm != null) {
return createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig);
} else {
return new TExternalCompactionJob();
}
} |
I'm working on trying out gRPC now to see how it goes. I figure it would be good to get something simple to at least compare how it could work and see if we want to go that direction. Besides a hello world RPC to try things, my plan would be to try and create one RPC for now, which is the getCompactionJob() RPC that exists in the CompactionCoordinatorService and see if it would work. |
This changes the CompactionCoordinator RPC service to use an Async thrift processor. This allows long polling when a compactor requests a new job by no longer blocking the Thrift IO thread when there are no jobs available. When a job is available, the response will be sent asynchronously back to the client. The only RPC service converted for now is the CompactionCoordinator RPC service but future services could be converted to use Async processors as well. This RPC is not using multiplexing as Async multiplexing is an open issue being worked on in THRIFT-2427. Once that issue is resolve the plan will be to convert to an async multiplexed processor. A second thrift service on another port was created in the Manager for handling async processors because they are handled differently than sync and you can't combine both sync and async processors when multiplexing. The coordinator rpc service now advertises the async port so clients connect to the correct service. This closes apache#4664
I have not had a chance to open up a new issue yet to cover my findings of experimenting with async RPC using Async thrift and gRPC yet as mentioned in this comment, but I was trying to think of any other options as well to help solve this, assuming the metrics that are added in #4945 demonstrate an issue. Something else I thought of that we could try is to create an embedded Jetty REST service in the Manager for the getCompactionJob() RPC call and switch the Compactors to use that service to get next compaction job. I think this could work because:
The drawback would of course be that it would be a one off service that was only used for this one use case but otherwise as long as the authentication worked it would check the same boxes in terms of efficiently reducing latency for comaption job run time by allowing long polling so when a job is ready it can be immediately sent to a free compactor without blocking io threads. |
Compactors poll for work w/ exponential backoff. When compactors are all idle for a while and there is a surge of jobs to do it can take them a bit to all start working.
One possible way to imporve this is to modify how polling works. The coordinator could hold request from compactors for a time period when nothing is currently queued. When something is queued it could be immediately given to a held compactor RPC request. Would not want to hold RPC request for too long because it could be related to a dead compactor. Could hold request for some time period like 60 to 90 seconds and return nothing if the queue is still empty. If the compactor is still alive it can make another request for work which will be held again if the queue is currently empty.
Decreasing this latency is good for a system that has lots of small files arriving constantly at tablets. With a model like this for polling and #4618, very low latency could be achieved for compaction of new bulk imported files. For minor compacted files would not have a signal like #4618 provides for bulk imports to queue compaction jobs for a tablet.
The text was updated successfully, but these errors were encountered: