Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Mar 22, 2024
1 parent 463c448 commit 6e36146
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,14 @@ private void setSortOrders(SortOrder[] sortOrdersInfo) {
}
}

public static Transform[] toGravitinoPartitions(
public static Transform[] toGravitinoPartitionings(
org.apache.spark.sql.connector.expressions.Transform[] transforms) {
if (ArrayUtils.isEmpty(transforms)) {
return Transforms.EMPTY_TRANSFORM;
}

return Arrays.stream(transforms)
.filter(
transform ->
!((transform instanceof BucketTransform)
|| (transform instanceof SortedBucketTransform)))
.filter(transform -> !isBucketTransform(transform))
.map(
transform -> {
if (transform instanceof IdentityTransform) {
Expand All @@ -80,6 +77,11 @@ public static Transform[] toGravitinoPartitions(
.toArray(Transform[]::new);
}

public static boolean isBucketTransform(
org.apache.spark.sql.connector.expressions.Transform transform) {
return transform instanceof BucketTransform || transform instanceof SortedBucketTransform;
}

public static DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders(
org.apache.spark.sql.connector.expressions.Transform[] transforms) {
DistributionAndSortOrdersInfo distributionAndSortOrdersInfo =
Expand All @@ -89,6 +91,7 @@ public static DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders
}

Arrays.stream(transforms)
.filter(transform -> isBucketTransform(transform))
.forEach(
transform -> {
if (transform instanceof SortedBucketTransform) {
Expand All @@ -100,6 +103,10 @@ public static DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders
BucketTransform bucketTransform = (BucketTransform) transform;
Distribution distribution = toGravitinoDistribution(bucketTransform);
distributionAndSortOrdersInfo.setDistribution(distribution);
} else {
throw new NotSupportedException(
"Only support BucketTransform and SortedBucketTransform, but get: "
+ transform.name());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public Table createTable(

DistributionAndSortOrdersInfo distributionAndSortOrdersInfo =
SparkTransformConverter.toGravitinoDistributionAndSortOrders(transforms);
com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitions =
SparkTransformConverter.toGravitinoPartitions(transforms);
com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitionings =
SparkTransformConverter.toGravitinoPartitionings(transforms);

try {
com.datastrato.gravitino.rel.Table table =
Expand All @@ -143,7 +143,7 @@ public Table createTable(
gravitinoColumns,
comment,
gravitinoProperties,
partitions,
partitionings,
distributionAndSortOrdersInfo.getDistribution(),
distributionAndSortOrdersInfo.getSortOrders());
return gravitinoAdaptor.createSparkTable(ident, table, sparkCatalog, propertiesConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ void init() {
void testPartition() {
sparkToGravitinoPartitionTransformMaps.forEach(
(sparkTransform, gravitinoTransform) -> {
Transform[] gravitinoPartitions =
SparkTransformConverter.toGravitinoPartitions(
Transform[] gravitinoPartitionings =
SparkTransformConverter.toGravitinoPartitionings(
new org.apache.spark.sql.connector.expressions.Transform[] {sparkTransform});
Assertions.assertTrue(gravitinoPartitions != null && gravitinoPartitions.length == 1);
Assertions.assertEquals(gravitinoTransform, gravitinoPartitions[0]);
Assertions.assertTrue(
gravitinoPartitionings != null && gravitinoPartitionings.length == 1);
Assertions.assertEquals(gravitinoTransform, gravitinoPartitionings[0]);
});

gravitinoToSparkPartitionTransformMaps.forEach(
Expand Down

0 comments on commit 6e36146

Please sign in to comment.