Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create prototype gRPC getCompactionJob() service for external compactions #4715

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
212ca11
Add grpc and protobuf setup to maven build
cshannon Jun 30, 2024
75eb46f
Add defintion and generate service/protocol for compaction service
cshannon Jun 30, 2024
00fcedd
Update the Compactor and Compaction coordinator to use new grpc service
cshannon Jun 30, 2024
1234d67
Clean up comments from example
cshannon Jun 30, 2024
0dc1df8
clean up comments in compaction-coordinator.proto
cshannon Jul 1, 2024
450d371
Add generated protobuf to .gitattributes
cshannon Jul 1, 2024
d7dd27d
Merge branch 'elasticity' into accumulo-4664-grpc
cshannon Jul 5, 2024
f27d6c5
Merge branch 'elasticity' into accumulo-4664-grpc
cshannon Jul 5, 2024
17e7aa4
Switch gRPC getCompactionJob() to using new async method
cshannon Jul 5, 2024
edf7125
Use thenAccept() as we don't need a return value
cshannon Jul 5, 2024
dae30cf
fix errorprone warning and send back exception on error to client
cshannon Jul 5, 2024
36481c7
Merge branch 'elasticity' into accumulo-4664-grpc
cshannon Jul 12, 2024
4e72058
Move protobuf objects into correct packages
cshannon Jul 12, 2024
3fb40bd
Add license headers to generated protobuf
cshannon Jul 12, 2024
c5c09af
Move compactionCompleted to gRPC, share grpc connection
cshannon Jul 13, 2024
854ee44
Move more CompactionService rpc calls to gRpc
cshannon Jul 13, 2024
23369fb
Merge branch 'elasticity' into accumulo-4664-grpc
cshannon Jul 19, 2024
548d2ca
Move new gRPC generation to a new module
cshannon Jul 19, 2024
80e5cb2
fix build with relative path in pom
cshannon Jul 19, 2024
3efd1e9
update .gitattributes after moving grpc
cshannon Jul 19, 2024
d57af85
address comments
cshannon Jul 19, 2024
c7b7e8f
Improve rpc creds in ClientContext and switch to string for PKeyExten…
cshannon Jul 19, 2024
3fc6be0
add a property for grpc port
cshannon Jul 19, 2024
9da244a
Move rest of compaction coordinator service to gRPC and update Monitor
cshannon Jul 20, 2024
f000c46
Remove no longer needed Thrift compaction coordinator service from Co…
cshannon Jul 20, 2024
a1b53ad
Merge branch 'elasticity' into accumulo-4664-grpc
cshannon Jul 20, 2024
936edd1
QA build fixes
cshannon Jul 20, 2024
fc8411f
fix test
cshannon Jul 21, 2024
4a02a28
Merge branch 'grpc' into accumulo-4664-grpc
cshannon Jul 26, 2024
a93f989
Merge branch 'grpc' into accumulo-4664-grpc
cshannon Aug 9, 2024
cd3573c
Merge branch 'grpc' into accumulo-4664-grpc
cshannon Aug 10, 2024
9b1c77f
Merge branch 'grpc' into accumulo-4664-grpc
cshannon Aug 16, 2024
66cb2cf
Add initial support for TLS for server/client
cshannon Aug 17, 2024
fa3a8b6
fix formatting
cshannon Aug 17, 2024
32f09d1
Merge branch 'grpc' into accumulo-4664-grpc
cshannon Aug 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
#

core/src/main/thrift-gen-java/** linguist-generated=true
rpc/grpc/src/main/protobuf-gen-java/** linguist-generated=true
29 changes: 27 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand All @@ -69,6 +73,14 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
Expand All @@ -85,6 +97,10 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-access</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-start</artifactId>
Expand Down Expand Up @@ -258,12 +274,14 @@
<excludes>
<exclude>.*[.]impl[.].*</exclude>
<exclude>.*[.]thrift[.].*</exclude>
<exclude>.*[.]protobuf[.].*</exclude>
<exclude>org[.]apache[.]accumulo[.]core[.]security[.]crypto[.].*</exclude>
</excludes>
<allows>
<!--Allow API data types to reference thrift types, but do not
analyze thrift types -->
<!--Allow API data types to reference thrift/protobuf types, but do not
analyze thrift/protobuf types -->
<allow>org[.]apache[.]accumulo[.].*[.]thrift[.].*</allow>
<allow>org[.]apache[.]accumulo[.].*[.]protobuf[.].*</allow>
<!--Type from hadoop used in API. If adding a new type from
Hadoop to the Accumulo API ensure its annotated as
stable.-->
Expand Down Expand Up @@ -395,6 +413,13 @@
</executions>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
</build>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.grpc.compaction.protobuf.PCredentials;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,7 +141,7 @@ public class ClientContext implements AccumuloClient {
private final Supplier<SaslConnectionParams> saslSupplier;
private final Supplier<SslConnectionParams> sslSupplier;
private final Supplier<ScanServerSelector> scanServerSelectorSupplier;
private TCredentials rpcCreds;
private RpcCredentialsHolder rpcCreds;
private ThriftTransportPool thriftTransportPool;

private volatile boolean closed = false;
Expand Down Expand Up @@ -457,13 +458,22 @@ public synchronized ConditionalWriterConfig getConditionalWriterConfig() {
* Serialize the credentials just before initiating the RPC call
*/
public synchronized TCredentials rpcCreds() {
return getRpcCreds().thrift();
}

public synchronized PCredentials gRpcCreds() {
return getRpcCreds().protobuf();
}

private synchronized RpcCredentialsHolder getRpcCreds() {
ensureOpen();
if (getCredentials().getToken().isDestroyed()) {
rpcCreds = null;
}

if (rpcCreds == null) {
rpcCreds = getCredentials().toThrift(getInstanceID());
rpcCreds = new RpcCredentialsHolder(() -> getCredentials().toThrift(getInstanceID()),
() -> getCredentials().toProtobuf(getInstanceID()));
}

return rpcCreds;
Expand Down Expand Up @@ -1093,4 +1103,23 @@ && getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABL
return caches;
}

private static class RpcCredentialsHolder {

private final com.google.common.base.Supplier<TCredentials> tCreds;
private final com.google.common.base.Supplier<PCredentials> pCreds;

RpcCredentialsHolder(com.google.common.base.Supplier<TCredentials> tCreds,
com.google.common.base.Supplier<PCredentials> pCreds) {
this.tCreds = Suppliers.memoize(tCreds);
this.pCreds = Suppliers.memoize(pCreds);
}

TCredentials thrift() {
return tCreds.get();
}

PCredentials protobuf() {
return pCreds.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.grpc.compaction.protobuf.PCredentials;

import com.google.protobuf.ByteString;

/**
* A wrapper for internal use. This class carries the instance, principal, and authentication token
Expand Down Expand Up @@ -99,6 +102,18 @@ public TCredentials toThrift(InstanceId instanceID) {
return tCreds;
}

public PCredentials toProtobuf(InstanceId instanceID) {
PCredentials pCreds = PCredentials.newBuilder().setPrincipal(getPrincipal())
.setTokenClassName(getToken().getClass().getName())
.setToken(ByteString.copyFrom(AuthenticationTokenSerializer.serialize(getToken())))
.setInstanceId(instanceID.canonical()).build();
if (getToken().isDestroyed()) {
throw new IllegalStateException("Token has been destroyed",
new AccumuloSecurityException(getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED));
}
return pCreds;
}

/**
* Converts a given thrift object to our internal Credentials representation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ public enum Property {
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_GRPC_CLIENTPORT("manager.port.grpc.client", "8999", PropertyType.PORT,
"The port used for handling gRPC client connections on the manager.", "1.3.5"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -1432,8 +1434,8 @@ public static boolean isValidTablePropertyKey(String key) {
// take effect; these are always system-level properties, and not namespace or table properties
public static final EnumSet<Property> fixedProperties = EnumSet.of(
// port options
GC_PORT, MANAGER_CLIENTPORT, TSERV_CLIENTPORT, SSERV_CLIENTPORT, SSERV_PORTSEARCH,
COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH,
GC_PORT, MANAGER_CLIENTPORT, MANAGER_GRPC_CLIENTPORT, TSERV_CLIENTPORT, SSERV_CLIENTPORT,
SSERV_PORTSEARCH, COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH,

// max message options
RPC_MAX_MESSAGE_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.grpc.compaction.protobuf.PKeyExtent;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Text;

import com.google.protobuf.ByteString;

/**
* keeps track of information needed to identify a tablet
*/
Expand Down Expand Up @@ -128,6 +131,32 @@ public TKeyExtent toThrift() {
prevEndRow() == null ? null : TextUtil.getByteBuffer(prevEndRow()));
}

/**
* Convert to Protobuf form.
*/
public PKeyExtent toProtobuf() {
PKeyExtent.Builder builder = PKeyExtent.newBuilder().setTable(tableId().canonical());
if (endRow() != null) {
builder.setEndRow(ByteString.copyFrom(endRow().getBytes()));
}
if (prevEndRow() != null) {
builder.setPrevEndRow(ByteString.copyFrom(prevEndRow().getBytes()));
}
return builder.build();
}

/**
* Create a KeyExtent from its Protobuf form.
*
* @param pke the KeyExtent in its Protobuf object form
*/
public static KeyExtent fromProtobuf(PKeyExtent pke) {
TableId tableId = TableId.of(pke.getTable());
Text endRow = !pke.hasEndRow() ? null : new Text(pke.getEndRow().toByteArray());
Text prevEndRow = !pke.hasPrevEndRow() ? null : new Text(pke.getPrevEndRow().toByteArray());
return new KeyExtent(tableId, endRow, prevEndRow);
}

/**
* Create a KeyExtent from a metadata previous end row entry.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,32 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
import org.apache.accumulo.grpc.compaction.protobuf.PIteratorConfig;
import org.apache.accumulo.grpc.compaction.protobuf.PIteratorSetting;

/**
* System utility class. Not for client use.
*/
public class SystemIteratorUtil {

public static TIteratorSetting toTIteratorSetting(IteratorSetting is) {
return new TIteratorSetting(is.getPriority(), is.getName(), is.getIteratorClass(),
is.getOptions());
public static PIteratorSetting toPIteratorSetting(IteratorSetting is) {
return PIteratorSetting.newBuilder().setPriority(is.getPriority()).setName(is.getName())
.setIteratorClass(is.getIteratorClass()).putAllProperties(is.getOptions()).build();
}

public static IteratorSetting toIteratorSetting(TIteratorSetting tis) {
return new IteratorSetting(tis.getPriority(), tis.getName(), tis.getIteratorClass(),
tis.getProperties());
public static IteratorSetting toIteratorSetting(PIteratorSetting pis) {
return new IteratorSetting(pis.getPriority(), pis.getName(), pis.getIteratorClass(),
pis.getPropertiesMap());
}

public static IteratorConfig toIteratorConfig(List<IteratorSetting> iterators) {
ArrayList<TIteratorSetting> tisList = new ArrayList<>();
public static PIteratorConfig toIteratorConfig(List<IteratorSetting> iterators) {
ArrayList<PIteratorSetting> pisList = new ArrayList<>();

for (IteratorSetting iteratorSetting : iterators) {
tisList.add(toTIteratorSetting(iteratorSetting));
pisList.add(toPIteratorSetting(iteratorSetting));
}

return new IteratorConfig(tisList);
return PIteratorConfig.newBuilder().addAllIterators(pisList).build();
}

public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(
Expand Down
Loading