-
Notifications
You must be signed in to change notification settings - Fork 450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create prototype gRPC getCompactionJob() service for external compactions #4715
Conversation
This includes dependencies and plugin to generate the grpc service and protocol buffer classes
This adds the defintions we need for the getCompactionJob() API and adds the generated source
getCompactionJob() now uses grpc. to minimize the changes the existing Thrift objects are converted between protobuf and back. If protobuf is kept then Thrift will eventually be removed entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, I haven't gotten to the actual code changes yet.
...-gen-java/org/apache/accumulo/core/compaction/protobuf/CompactionCoordinatorServiceGrpc.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be a proponent of replacing the entire Coordinator <-> Compactor communication pathway with GRPC. If you did this, then we would not need to do this conversion of Thrift <-> Protobuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we shouldn't need to the any conversion in the final version. Ideally we would not use Thrift at all, it was just simpler to do the conversion because when I started to get rid of Thrift then all of that cascades changes everywhere else because our Thrift objects are so tightly coupled.
/** | ||
* Simple wrapper to start/stop the grpc server | ||
*/ | ||
public class CompactionCoordinatorServiceServer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if a better place to start looking at GRPC is to try and create something like to test Kerberos and TLS with GRPC. Presumably that would lead to a base class that can be used for all server implementations.
try { | ||
// Start up the grpc compaction service | ||
// TODO: The port is just hardcoded for now and will need to be configurable | ||
grpcService = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ServiceDescriptor
is going to need to change, even if just slightly. The ServiceDescriptor objects are stored in ZooKeeper with the ServiceLock as a way to advertise which services are available and on which ports.
I pushed up changes that take advantage of #4726 to get a new job async that returns a CompletableFuture. The result is not sent back to the client (coordinator) util the job is completed. To make it simple I just copied the relevant code out of the Thrift method because the current method can't return a CompletableFuture as it's an RPC service. We need to decide if we want to keep the Thrift code around temporarily so either RPC can be used or to just drop it. I also went ahead and dropped the loop that tries to get another job if the reservation fails to simplify things. I figure we just return the empty job and it would try again and that is fine. There are a couple things we could probably improve here. First, I am using future.thenApply() and future.thenAccept() and it woud probably be better to use a thread pool and the Async versions of those methods. Second, there's currently no timeout so we need to look at having a timeout so things are not waiting forever. |
I was experimenting with this locally and its really neat to see it in action. Made the following throwaway changes to dae30cf. The logging changes were to get a bit more information about timing. The test changes were made to cause a surge of bulk import files after the system was sitting idle for a bit. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index ef07b78d8a..838fab35a7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -462,7 +462,7 @@ public class CompactionCoordinator
TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
return jobQueues.getAsync(groupId).thenApply(metaJob -> {
- LOG.trace("Next metaJob is ready {}", metaJob.getJob());
+ LOG.debug("Next metaJob is ready {}", metaJob.getJob());
Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
// this method may reread the metadata, do not use the metadata in metaJob for anything after
@@ -471,6 +471,7 @@ public class CompactionCoordinator
var kind = metaJob.getJob().getKind();
+ LOG.debug("Reserving compaction job {}", externalCompactionId);
// Only reserve user compactions when the config is present. When compactions are canceled the
// config is deleted.
var cid = ExternalCompactionId.from(externalCompactionId);
diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
index 1dc1c2c0f9..a3d5a61856 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java
@@ -96,6 +96,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileUtil;
@@ -126,12 +127,24 @@ public abstract class ComprehensiveBaseIT extends SharedMiniClusterBase {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().create(table);
- bulkImport(client, table, List.of(generateKeys(0, 100), generateKeys(100, 200)));
+ // sleep for a bit to let compactors idle
+ UtilWaitThread.sleep(60000);
+
+ // add 4 files to a single tablet, should cause tablet to need compaction
+ bulkImport(client, table, List.of(generateKeys(0, 50), generateKeys(50, 100),
+ generateKeys(100, 150), generateKeys(150, 200)));
verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 200));
+ UtilWaitThread.sleep(60000);
+
+ // add 4 more files to tablet
bulkImport(client, table,
- List.of(generateKeys(200, 300), generateKeys(300, 400), generateKeys(400, 500)));
+ List.of(generateKeys(200, 300), generateKeys(300, 400),
+ generateKeys(400, 450), generateKeys(450, 500)));
+
+ UtilWaitThread.sleep(60000);
+
verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 500));
}
Ran the modified test ad then looked in the manager log and saw the following.
Summarizing the above log messages.
So this is really neat, the async request starts working immediately after the job is queued and within 3ms to 4ms has reserved the job and returned it to the compactor. Saw something else that was interesting besides timing in the logs. The numbers after the times in the logs are thread ids. So thread id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking over this. Was able to run mvn clean package -Pprotobuf -DskipTests
locally and that worked w/o issue and I saw the protobuf code being regenerated.
Looking over this I was trying to understand what still needs to be done and came up with the following list. What else is there to be done?
- fix hard coded port
- add thread pool for async rpc request response processing
- drop thrift coordinator service and only have grpc coordinator service, this opens up the possibility of grpc<->accumulo_data_strucs conversions instead of accumulo_data_strucs<->thrift<->grpc conversions.
- add needed grpc jars to tarball
- explore supporting kerberos and ssl w/ grpc
- add accumulo config for thread pools for processing grpc request
For what is left to be done, what do you think can be done in the initial PR and what could be deferred to follow on PRs?
@@ -164,6 +164,13 @@ | |||
<type>pom</type> | |||
<scope>import</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>io.grpc</groupId> | |||
<artifactId>grpc-bom</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are jars needed at runtime for grpc, then the assembly pom may need to be modified to include those so that they show up in the tarball Not sure what, if anything, needs to be done for this. Could be a follow on issue.
LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); | ||
TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); | ||
|
||
return jobQueues.getAsync(groupId).thenApply(metaJob -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will eventually need to process these in another thread pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to just use thenApplyAsync()
and pass in an executor. We could start with creating a regular thread pool for this but I'm wondering if this would be a time to look into Virtual threads
public CompactionCoordinatorServiceServer( | ||
CompactionCoordinatorServiceGrpc.CompactionCoordinatorServiceImplBase service, int port) | ||
throws IOException { | ||
this(Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()), service, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you happen to know the threading model for processing incoming request? Seems like we could possibly have two thread pools one for processing incoming request and another pool for async processing of responses. Not sure though, need to look at the grpc docs. Currently for thrift we have config related to thread pool sizes, wondering if we will need to have config for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to configure different pools for requests and responses for sure. gRPC is async based out of the box so we will be using a different thread for sending responses.
I found this helpful post which is written by one of the contributors to the java grpc library which explains how to configure the thread pool for the responses vs requests.
The Executor that you provide is what actually executes the callbacks of the rpc. This frees up the EventLoop to continue processing data on the connection. When a new message arrives from the network, it is read on the event loop, and then propagated up the stack to the executor. The executor takes the messages and passes them to your ServerCall.Listener which will actually do the processing of the data.
By default, gRPC uses a cached thread pool so that it is very easy to get started. However it is strongly recommended you provide your own executor. The reason is that the default threadpool behaves badly under load, creating new threads when the rest are busy.
In order to set up the event loop group, you call the workerEventLoopGroup method on NettyServerBuilder. gRPC is not strictly dependent on Netty (other server transports are possible) so the Netty specific builder must be used.
We are using the generic ServerBuilder API right now which allows setting an executor and that is for the responses according to that post. We could switch to using the more specific NettyServerBuilder instead and that would also allow us to configure the event loop for the incoming requests and tune that.
I will take a look more today at the state of this and report what I think. The tricky part is definitely trying to figure out how big of a scope to make this task vs follow on tasks. We can likely make a lot of things follow on tasks (like SSL and kerberos support) because this is for the elasticity and 4.0 branch so it's not going to be released anytime soon so it's not like it has to be perfect. But if we defer things to follow on issues we should certainly document them so they are not forgotten. It would be nice to at least clean up the protobuf definitions a bit (move things into different namespaces like thrift) and it would also be nice if we could drop the thrift <-> gRPC conversion entirely but I need to dive into the code and see how big of a change that is. I think we would want to use gRPC for all the RPC calls for the compaction service so I can look today to see how much effort that is and if it makes sense to do it now or split it up. |
Update getCompactionJob() to use native accumulo to protobuf conversion and skip thrift conversion where possible.
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
Outdated
Show resolved
Hide resolved
I have migrated most of Compaction Coordinator Service as gRPC at this point. I also updated the gRPC client to be shared across the different calls as the channel is thread safe and can be re-used, but we are not "pooling" different channels like Thrift. I still need to look more into how their client code works and if we need to pool or not. Some of the rpc calls are commented out because the Manager actually uses them. I also moved the gRPC objects into their own packages to model after our thrift model. We need to eventually move that into its own jar. This has been talked about a bit already, and that is looking at maybe having a common API for the RPC layer. That is something that I need to look more into and if we were going to do that how far we take it and the scope. Whatever we decide we should plan before we go too far. At the very least I think it would be nice to re-factor some things and have some common objects so we stop leaking rpc objects. For example, I think we should have a common object for tracing and credentials. Right now we are passing around TInfo and TCredentials (which I also created PInfo and PCredentials for protobuf) and I think stuff like that should be converted to common objects so we don't leak them. A good example is TCredentials leaking into ClientContext. It would be nice if we could have a more generic object and conversion. Other than that, I haven't had time to finish up the rest of the list yet, but there are still several things to do and I'm still looking at how to split things up and what to make follow on issues. This PR is getting pretty big by now so it would probably be best to move as much as possible into follow on issues to make things easier to review. |
Here is an update to the current state of things. This PR has gotten quite large now so I think it's about time to wrap it up and look at follow on tasks for more work to be done. Below is where things are at for completed, still to be done in this PR, and future work. Finished:
Todo in this PR:
Follow on Work:
|
I feel like we put the cart before the horse here. These items tagged for follow-on work are actually foundational things that, IMO, should be designed and tested first. I think the work that you have done here is useful to show that grpc can be used, but I'm not convinced that this is the starting point for the introduction of grpc into the codebase. |
So to clarify, follow on work are of course foundational things and need to be done before anything could be released. It's not that they are not important, it's that this PR is already 100 files changed and enormous and I don't think we should try and jam everything into one PR or you end up trying to review too much. @keith-turner and I talked about creating another branch like no-chop merge but we didn't really think it would be a huge deal to just merge things into elasticity as long as the build works. However it would certainly be possible to just create another branch like we did with no-chop merge and have multiple PRs against that first before merging into back into elasticity. |
IMO, I think the foundation work should be done before this is merged, not released. The implementation of the foundation work may change the approach taken in this PR.
I have no issue with a long-lived feature branch, much like elasticity, for the grpc work. I don't these changes should be merged into elasticity at this time. |
If no one has any objections i can create a feature branch (i'll probably just call it |
I think a feature branch would be good, can move faster. naming it grpc SGTM |
I created a new |
I started looking into the authentication support a couple weeks ago and resumed looking into it today to see what was supported. mutual TLS is supported as it's Java and Netty supports that however Kerberos is not supported out of the box. The only authentication that is supported out of the box is SSL/TLS, OAUTH, and a custom Google mechanism called ALTS. See https://grpc.io/docs/guides/auth/ I did some digging to see if anyone has implemented anything we could use and found the following:
|
The changes from #4811 will fix the hanging |
If I'm reading https://issues.apache.org/jira/browse/THRIFT-5762 correctly (should be included in the next version of Thrift), this would allow us to use the Thrift message types with a GRPC transport. If that's true, then I think that makes the jump to GRPC a lot simpler. |
That would be interesting, I haven't looked yet how hard it is to plug in different protocols with gRPC but I'm sure it will take some work to get the serializer/deserializer stuff correct. One thing I noticed with protocol buffers that is a bit annoying (besides not supporting null which makes converting from Thrift require more changes) is there seems to be a lot of buffer copying. There's talk about some ways to make it more efficient with zero copy and I found this as well grpc/grpc-java#7387 So we may need to look at memory efficiency for performance if we stayed with protobuf. I found this blog post which I thought was interesting and talks a bit about the challenges they had moving from Thrift to gRPC and lessons learned including dealing with zero copy and also the lack of SASL support like Thrift has https://www.alluxio.io/blog/moving-from-apache-thrift-to-grpc-a-perspective-from-alluxio/ |
Did you look at flatbuffers? |
I didn't see that but it looks like that could be an option to explore as well. |
This is a prototype that creates a new gRPC service for getCompactionJob(). Instead of using Thrift, gRPC is now used (which is based on http2 and netty) and uses protocol buffers version 3 as the protocol. gRPC supports other formats but using protobuf was the simplest thing to get working as all the examples use protobof and protobuf is pretty similar to Thrift. I am marking this as a draft as it probably wouldn't get merged in its current state without some re-working.
The goal of this is to make it possible to do async RPC processing on the server side to support #4664. With gRPC we will be able to easily offload the RPC calls for getting a next compaction job from the IO threadpool once the queue supports
CompletableFuture
as described in this comment . The gRPC api makes it easy to respond async to requests so this will allow compactors to request new jobs and wait (long polling) and the server can offload the future and complete it later when a job comes in without blocking and using resources. We also have the option of making the call async on the client side as well.A few things to note:
compaction-coordinator.proto
file. The sources have already been generated but with updates a new version can be generated usingmvn generate-sources -Pprotobuf
In order to get the RPC working I had to re-create all the same objects that we were using in Thrift but now with protobuf. I just combined them all into one namespace for now to keep it simple.ExternalCompaction_1_IT
against this and that test is passing.