Skip to content

Commit

Permalink
[core] Throw exception if increment query with rescale bucket (#4984)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jan 23, 2025
1 parent 78cfc72 commit bfd0a0a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho

Options conf = options.toConfiguration();
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
new TagManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch());
if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
Pair<String, String> incrementalBetween = options.incrementalBetween();
Optional<Tag> startTag = tagManager.get(incrementalBetween.getLeft());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.tag.TagPeriodHandler;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -50,6 +51,14 @@ public IncrementalTagStartingScanner(
this.start = start;
this.end = end;
this.startingSnapshotId = start.id();

TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery(
new SchemaManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch()),
start.schemaId(),
end.schemaId());
}

@Override
Expand All @@ -66,7 +75,10 @@ public static AbstractStartingScanner create(
endTagName);

TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
new TagManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch());

Optional<Tag> endTag = tagManager.get(endTagName);
if (!endTag.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TagManager;
Expand All @@ -30,6 +31,7 @@
import java.util.List;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** The util class of resolve snapshot from scan params for time travel. */
public class TimeTravelUtil {
Expand Down Expand Up @@ -58,7 +60,7 @@ public static Snapshot resolveSnapshotFromOptions(
return snapshotManager.latestSnapshot();
}

Preconditions.checkArgument(
checkArgument(
scanHandleKey.size() == 1,
String.format(
"Only one of the following parameters may be set : [%s, %s, %s, %s]",
Expand Down Expand Up @@ -124,4 +126,22 @@ private static Snapshot resolveSnapshotByTagName(
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
return tagManager.getOrThrow(tagName).trimToSnapshot();
}

public static void checkRescaleBucketForIncrementalTagQuery(
SchemaManager schemaManager, long schemaId1, long schemaId2) {
if (schemaId1 != schemaId2) {
int bucketNumber1 = bucketNumber(schemaManager, schemaId1);
int bucketNumber2 = bucketNumber(schemaManager, schemaId2);
checkArgument(
bucketNumber1 == bucketNumber2,
"The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.",
bucketNumber1,
bucketNumber2);
}
}

private static int bucketNumber(SchemaManager schemaManager, long schemaId) {
TableSchema schema = schemaManager.schema(schemaId);
return CoreOptions.fromMap(schema.options()).bucket();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;
Expand All @@ -37,6 +38,7 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -652,6 +654,33 @@ public void testScanBounded() {
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
}

@Test
public void testIncrementTagQueryWithRescaleBucket() throws Exception {
sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '1')");
Table table = paimonTable("test");

sql("INSERT INTO test VALUES (1, 11), (2, 22)");
sql("ALTER TABLE test SET ('bucket' = '2')");
sql("INSERT OVERWRITE test SELECT * FROM test");
sql("INSERT INTO test VALUES (3, 33)");

table.createTag("2024-01-01", 1);
table.createTag("2024-01-02", 3);

List<String> incrementalOptions =
Arrays.asList(
"'incremental-between'='2024-01-01,2024-01-02'",
"'incremental-to-auto-tag'='2024-01-02'");

for (String option : incrementalOptions) {
assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"The bucket number of two tags are different (1, 2), which is not supported in incremental tag query."));
}
}

private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv, sql);
while (!transformation.getInputs().isEmpty()) {
Expand Down

0 comments on commit bfd0a0a

Please sign in to comment.