Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support upper bound in dynamic bucket mode #4974

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/primary-key-table/data-distribution.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Paimon will automatically expand the number of buckets.

- Option1: `'dynamic-bucket.target-row-num'`: controls the target row number for one bucket.
- Option2: `'dynamic-bucket.initial-buckets'`: controls the number of initialized bucket.
- Option3: `'dynamic-bucket.max-buckets'`: controls the number of max buckets.

{{< hint info >}}
Dynamic Bucket only support single write job. Please do not start multiple jobs to write to the same partition
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@
<td>Integer</td>
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.max-buckets</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Max buckets for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
</tr>
<tr>
<td><h5>dynamic-bucket.target-row-num</h5></td>
<td style="word-wrap: break-word;">2000000</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Initial buckets for a partition in assigner operator for dynamic bucket mode.");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS =
key("dynamic-bucket.max-buckets")
.intType()
.defaultValue(-1)
.withDescription(
"Max buckets for a partition in dynamic bucket mode, It should "
+ "either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
key("dynamic-bucket.assigner-parallelism")
.intType()
Expand Down Expand Up @@ -2219,6 +2227,10 @@ public Integer dynamicBucketInitialBuckets() {
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
}

public Integer dynamicBucketMaxBuckets() {
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS);
}

public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HashBucketAssigner implements BucketAssigner {
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
private final int maxBucketsNum;

private final Map<BinaryRow, PartitionIndex> partitionIndex;

Expand All @@ -55,7 +56,8 @@ public HashBucketAssigner(
int numChannels,
int numAssigners,
int assignId,
long targetBucketRowNumber) {
long targetBucketRowNumber,
int maxBucketsNum) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.indexFileHandler = indexFileHandler;
Expand All @@ -64,6 +66,7 @@ public HashBucketAssigner(
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
this.maxBucketsNum = maxBucketsNum;
}

/** Assign a bucket for key hash of a record. */
Expand All @@ -84,7 +87,7 @@ public int assign(BinaryRow partition, int hash) {
this.partitionIndex.put(partition, index);
}

int assigned = index.assign(hash, this::isMyBucket);
int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assign " + assigned + " to the partition " + partition + " key hash " + hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -38,6 +43,7 @@

/** Bucket Index Per Partition. */
public class PartitionIndex {
private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class);

public final Int2ShortHashMap hash2Bucket;

Expand All @@ -57,13 +63,13 @@ public PartitionIndex(
long targetBucketRowNumber) {
this.hash2Bucket = hash2Bucket;
this.nonFullBucketInformation = bucketInformation;
this.totalBucket = new HashSet<>(bucketInformation.keySet());
this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
this.targetBucketRowNumber = targetBucketRowNumber;
this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
this.accessed = true;
}

public int assign(int hash, IntPredicate bucketFilter) {
public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
accessed = true;

// 1. is it a key that has appeared before
Expand All @@ -80,29 +86,35 @@ public int assign(int hash, IntPredicate bucketFilter) {
Long number = entry.getValue();
if (number < targetBucketRowNumber) {
entry.setValue(number + 1);
hash2Bucket.put(hash, bucket.shortValue());
return bucket;
return cacheBucketAndGet(hash2Bucket, hash, bucket);
} else {
iterator.remove();
}
}

// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
hash2Bucket.put(hash, (short) i);
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
return i;
if (-1 == maxBucketsNum || totalBucket.size() < maxBucketsNum) {
// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
return cacheBucketAndGet(hash2Bucket, hash, i);
}
}
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucket, targetBucketRowNumber));
@SuppressWarnings("OptionalGetWithoutIsPresent")
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucket, targetBucketRowNumber));
} else {
// exceed buckets upper bound
return cacheBucketAndGet(
hash2Bucket,
hash,
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum));
}
}

public static PartitionIndex loadIndex(
Expand Down Expand Up @@ -137,4 +149,47 @@ public static PartitionIndex loadIndex(
}
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
}

public static int cacheBucketAndGet(Int2ShortHashMap hash2Bucket, int hash, int bucket) {
hash2Bucket.put(hash, (short) bucket);
return bucket;
}

public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) {
int[] maxBucketsArr = new int[assigners];
if (-1 == maxBuckets) {
Arrays.fill(maxBucketsArr, -1);
return maxBucketsArr;
}
if (0 >= maxBuckets) {
throw new IllegalArgumentException(
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
}
int avg = maxBuckets / assigners;
int remainder = maxBuckets % assigners;
for (int i = 0; i < assigners; i++) {
maxBucketsArr[i] = avg;
if (remainder > 0) {
maxBucketsArr[i]++;
remainder--;
}
}
LOG.info(
"After distributing max-buckets '{}' to '{}' assigners evenly, maxBuckets layout: '{}'.",
maxBuckets,
assigners,
Arrays.toString(maxBucketsArr));
return maxBucketsArr;
}

public static int getSpecifiedMaxBuckets(int[] maxBucketsArr, int assignerId) {
int length = maxBucketsArr.length;
if (length == 0) {
throw new IllegalStateException("maxBuckets layout should exists!");
} else if (assignerId < length) {
return maxBucketsArr[assignerId];
} else {
return -1 == maxBucketsArr[0] ? -1 : 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,33 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import static org.apache.paimon.index.PartitionIndex.cacheBucketAndGet;

/** When we need to overwrite the table, we should use this to avoid loading index. */
public class SimpleHashBucketAssigner implements BucketAssigner {

private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
private final int maxBucketsNum;

private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;

public SimpleHashBucketAssigner(int numAssigners, int assignId, long targetBucketRowNumber) {
public SimpleHashBucketAssigner(
int numAssigners, int assignId, long targetBucketRowNumber, int maxBucketsNum) {
this.numAssigners = numAssigners;
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
this.maxBucketsNum = maxBucketsNum;
}

@Override
Expand Down Expand Up @@ -71,7 +78,7 @@ private class SimplePartitionIndex {
private int currentBucket;

private SimplePartitionIndex() {
bucketInformation = new HashMap<>();
bucketInformation = new LinkedHashMap<>();
loadNewBucket();
}

Expand All @@ -83,7 +90,15 @@ public int assign(int hash) {

Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);
if (num >= targetBucketRowNumber) {
loadNewBucket();
if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) {
return cacheBucketAndGet(
hash2Bucket,
hash,
KeyAndBucketExtractor.bucketWithUpperBound(
bucketInformation.keySet(), hash, maxBucketsNum));
} else {
loadNewBucket();
}
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1);
hash2Bucket.put(hash, (short) currentBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.types.RowKind;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand All @@ -31,6 +38,7 @@
* @param <T> type of record
*/
public interface KeyAndBucketExtractor<T> {
Logger LOG = LoggerFactory.getLogger(KeyAndBucketExtractor.class);

void setRecord(T record);

Expand All @@ -51,4 +59,17 @@ static int bucket(int hashcode, int numBuckets) {
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
return Math.abs(hashcode % numBuckets);
}

static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int maxBucketsNum) {
checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " + maxBucketsNum);
LOG.debug(
"Assign record (hashcode '{}') to new bucket exceed upper bound '{}' defined in '{}', Stop creating new buckets.",
hashcode,
maxBucketsNum,
DYNAMIC_BUCKET_MAX_BUCKETS.key());
return bucketsSet.stream()
.skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
.findFirst()
.orElse(0);
}
}
Loading
Loading