Skip to content

Commit

Permalink
KUDU-1802: Avoid calls to master when using scan tokens
Browse files Browse the repository at this point in the history
This patch adds new metadata to the scan token to allow it
to contain all of the metadata required to construct a KuduTable
and open a scanner in the clients. This means the GetTableSchema
and GetTableLocations RPC calls to the master are no longer required
when using the scan token.

New TableMetadataPB, TabletMetadataPB, and authorization token
fields were added as optional fields on the token. Additionally a
`projected_column_idx` field was added that can be used in place
of the `projected_columns`. This significantly reduces the size of
the scan token by not duplicating the ColumnSchemaPB that is
already in the TableMetadataPB.

Adding the table metadata to the scan token is enabled by
default given it’s more scalable and performant. However,
it can be disabled in rare cases where more resiliency to
column renaming is desired. One example where disabling the
table metadata is used is the backup job. Future work, tracked
by KUDU-3146, should allow for table metadata to be leveraged in
those cases as well.

This doesn’t avoid the need for a call to the master to get the
schema in the case of writing data to Kudu, that work is tracked
by KUDU-3135. I expect the TableMetadataPB message would
be used there as well.

I included the ability to disable this functionality in the
kudu-spark integration via `kudu.useDriverMetadata` just
in case there are any unforeseen issues or regressions with
this feature.

I added a test to compare the serialized size of the scan token with
and without the table and tablet metadata. The size results for a
100 column table are:
   no metadata: 2697 Bytes
   tablet metadata: 2805
   tablet, table, and authz metadata: 3258

Change-Id: I88c1b8392de37dd5e8b7bd8b78a21603ff8b1d1b
Reviewed-on: http://gerrit.cloudera.org:8080/16031
Reviewed-by: Grant Henke <[email protected]>
Tested-by: Grant Henke <[email protected]>
  • Loading branch information
granthenke committed Jun 30, 2020
1 parent 23f67ae commit d23ee5d
Show file tree
Hide file tree
Showing 19 changed files with 1,023 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class KuduBackupRDD private[kudu] (
@transient val sc: SparkContext)
extends RDD[Row](sc, Nil) {

// TODO (KUDU-2785): Split large tablets into smaller scan tokens?
override protected def getPartitions: Array[Partition] = {
val client = kuduContext.syncClient

Expand All @@ -58,6 +57,10 @@ class KuduBackupRDD private[kudu] (
.scanRequestTimeout(options.scanRequestTimeoutMs)
.prefetching(options.scanPrefetching)
.keepAlivePeriodMs(options.keepAlivePeriodMs)
// TODO(KUDU-3135): Make backup scans a bit more resilient to column renames given these
// jobs are often critical, longer running, and scheduled in bulk. Once scans with
// provided table metadata better handle column renames this can be removed.
.includeTableMetadata(false)

options.splitSizeBytes.foreach { size =>
builder.setSplitSizeBytes(size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2288,18 +2288,7 @@ void discoverTablets(KuduTable table,
String tableId = table.getTableId();
String tableName = table.getName();

// Doing a get first instead of putIfAbsent to avoid creating unnecessary
// table locations caches because in the most common case the table should
// already be present.
TableLocationsCache locationsCache = tableLocations.get(tableId);
if (locationsCache == null) {
locationsCache = new TableLocationsCache();
TableLocationsCache existingLocationsCache =
tableLocations.putIfAbsent(tableId, locationsCache);
if (existingLocationsCache != null) {
locationsCache = existingLocationsCache;
}
}
TableLocationsCache locationsCache = getOrCreateTableLocationsCache(tableId);

// Build the list of discovered remote tablet instances. If we have
// already discovered the tablet, its locations are refreshed.
Expand Down Expand Up @@ -2385,6 +2374,22 @@ void discoverTablets(KuduTable table,
}
}

TableLocationsCache getOrCreateTableLocationsCache(String tableId) {
// Doing a get first instead of putIfAbsent to avoid creating unnecessary
// table locations caches because in the most common case the table should
// already be present.
TableLocationsCache locationsCache = tableLocations.get(tableId);
if (locationsCache == null) {
locationsCache = new TableLocationsCache();
TableLocationsCache existingLocationsCache =
tableLocations.putIfAbsent(tableId, locationsCache);
if (existingLocationsCache != null) {
locationsCache = existingLocationsCache;
}
}
return locationsCache;
}

/**
* Gets the tablet location cache entry for the tablet in the table covering a partition key.
* @param tableId the table
Expand Down
266 changes: 231 additions & 35 deletions java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static Schema pbToSchema(Common.SchemaPB schema) {
* @param pb the partition schema protobuf message
* @return a partition instance
*/
static PartitionSchema pbToPartitionSchema(Common.PartitionSchemaPB pb, Schema schema) {
public static PartitionSchema pbToPartitionSchema(Common.PartitionSchemaPB pb, Schema schema) {
List<Integer> rangeColumns = pbToIds(pb.getRangeSchema().getColumnsList());
PartitionSchema.RangeSchema rangeSchema = new PartitionSchema.RangeSchema(rangeColumns);

Expand All @@ -225,6 +225,28 @@ static PartitionSchema pbToPartitionSchema(Common.PartitionSchemaPB pb, Schema s
return new PartitionSchema(rangeSchema, hashSchemas.build(), schema);
}

public static Common.PartitionSchemaPB partitionSchemaToPb(PartitionSchema partitionSchema) {
Common.PartitionSchemaPB.Builder builder = Common.PartitionSchemaPB.newBuilder();

for (PartitionSchema.HashBucketSchema hashBucketSchema :
partitionSchema.getHashBucketSchemas()) {
Common.PartitionSchemaPB.HashBucketSchemaPB.Builder hbsBuilder =
Common.PartitionSchemaPB.HashBucketSchemaPB.newBuilder()
.addAllColumns(idsToPb(hashBucketSchema.getColumnIds()))
.setNumBuckets(hashBucketSchema.getNumBuckets())
.setSeed(hashBucketSchema.getSeed());
builder.addHashBucketSchemas(hbsBuilder.build());
}

Common.PartitionSchemaPB.RangeSchemaPB rangeSchemaPB =
Common.PartitionSchemaPB.RangeSchemaPB.newBuilder()
.addAllColumns(idsToPb(partitionSchema.getRangeSchema().getColumnIds()))
.build();
builder.setRangeSchema(rangeSchemaPB);

return builder.build();
}

/**
* Constructs a new {@code Partition} instance from the a protobuf message.
* @param pb the protobuf message
Expand All @@ -236,6 +258,14 @@ static Partition pbToPartition(Common.PartitionPB pb) {
pb.getHashBucketsList());
}

static Common.PartitionPB partitionToPb(Partition partition) {
return Common.PartitionPB.newBuilder()
.setPartitionKeyStart(ByteString.copyFrom(partition.getPartitionKeyStart()))
.setPartitionKeyEnd(ByteString.copyFrom(partition.getPartitionKeyEnd()))
.addAllHashBuckets(partition.getHashBuckets())
.build();
}

/**
* Deserializes a list of column identifier protobufs into a list of column IDs. This method
* relies on the fact that the master will aways send a partition schema with column IDs, and not
Expand Down Expand Up @@ -265,6 +295,24 @@ private static List<Integer> pbToIds(
return columnIds.build();
}

/**
* Serializes a list of column IDs into a list of column identifier protobufs.
*
* @param columnIds the column IDs
* @return the column identifiers
*/
private static List<Common.PartitionSchemaPB.ColumnIdentifierPB> idsToPb(
List<Integer> columnIds) {
ImmutableList.Builder<Common.PartitionSchemaPB.ColumnIdentifierPB> columnIdentifiers =
ImmutableList.builder();
for (Integer id : columnIds) {
Common.PartitionSchemaPB.ColumnIdentifierPB columnIdentifierPB =
Common.PartitionSchemaPB.ColumnIdentifierPB.newBuilder().setId(id).build();
columnIdentifiers.add(columnIdentifierPB);
}
return columnIdentifiers.build();
}

private static byte[] objectToWireFormat(ColumnSchema col, Object value) {
switch (col.getType()) {
case BOOL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ byte[] getTabletIdAsBytes() {
return tabletId.getBytes(UTF_8);
}

List<ServerInfo> getTabletServersCopy() {
List<ServerInfo> results = new ArrayList<>();
synchronized (tabletServers) {
results.addAll(tabletServers.values());
}
return results;
}

@Override
public int compareTo(RemoteTablet remoteTablet) {
if (remoteTablet == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public byte[] getUpperBoundPartitionKey() {
return tablet == null ? upperBoundPartitionKey : tablet.getPartition().getPartitionKeyEnd();
}

private long ttl() {
long ttl() {
return TimeUnit.NANOSECONDS.toMillis(deadline - ticker.read());
}

Expand Down
Loading

0 comments on commit d23ee5d

Please sign in to comment.