Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 6, 2025
1 parent 3be8256 commit fe19324
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 60 deletions.
8 changes: 1 addition & 7 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,6 @@
<td>String</td>
<td>Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2.</td>
</tr>
<tr>
<td><h5>incremental-to</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Used for "incremental-to-auto-tag" to specify the auto-created tag to reading incremental changes.</td>
</tr>
<tr>
<td><h5>local-merge-buffer-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -723,7 +717,7 @@
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
<td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.</li><li>"full": Deprecated. Same as "latest-full".</li><li>"latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.</li><li>"compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.</li><li>"from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.</li><li>"from-file-creation-time": For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.</li><li>"from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.</li><li>"from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li><li>"incremental": Read incremental changes between start and end snapshot or timestamp.</li><li>"incremental-to-auto-tag": Specify an auto-created tag, then try to find an earlier auto-created tag to read incremental changes. If specified tag is the first auto-created tag or doesn't exist, the result is empty.</li></ul></td>
<td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.</li><li>"full": Deprecated. Same as "latest-full".</li><li>"latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.</li><li>"compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.</li><li>"from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.</li><li>"from-file-creation-time": For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.</li><li>"from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.</li><li>"from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li><li>"incremental": Read incremental changes between start and end snapshot or timestamp.</li></ul></td>
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
Expand Down
22 changes: 7 additions & 15 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,8 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start timestamp (exclusive) and end timestamp, "
+ "for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");

public static final ConfigOption<String> INCREMENTAL_TO =
key("incremental-to")
public static final ConfigOption<String> INCREMENTAL_TO_AUTO_TAG =
key("incremental-to-auto-tag")
.stringType()
.noDefaultValue()
.withDescription(
Expand Down Expand Up @@ -2126,8 +2126,8 @@ public IncrementalBetweenScanMode incrementalBetweenScanMode() {
return options.get(INCREMENTAL_BETWEEN_SCAN_MODE);
}

public String incrementalTo() {
return options.get(INCREMENTAL_TO);
public String incrementalToAutoTag() {
return options.get(INCREMENTAL_TO_AUTO_TAG);
}

public Integer scanManifestParallelism() {
Expand Down Expand Up @@ -2481,12 +2481,7 @@ public enum StartupMode implements DescribedEnum {

INCREMENTAL(
"incremental",
"Read incremental changes between start and end snapshot or timestamp."),

INCREMENTAL_TO_AUTO_TAG(
"incremental-to-auto-tag",
"Specify an auto-created tag, then try to find an earlier auto-created tag to read incremental changes. "
+ "If specified tag is the first auto-created tag or doesn't exist, the result is empty.");
"Read incremental changes between start and end snapshot or timestamp.");

private final String value;
private final String description;
Expand Down Expand Up @@ -2768,14 +2763,11 @@ public static void setDefaultValues(Options options) {
}

if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
|| options.contains(INCREMENTAL_BETWEEN))
|| options.contains(INCREMENTAL_BETWEEN)
|| options.contains(INCREMENTAL_TO_AUTO_TAG))
&& !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.INCREMENTAL);
}

if (options.contains(INCREMENTAL_TO) && !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.INCREMENTAL_TO_AUTO_TAG);
}
}

public static List<ConfigOption<?>> getOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
Expand Down Expand Up @@ -280,7 +280,7 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN,
INCREMENTAL_TO),
INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
Expand All @@ -297,37 +297,27 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN,
INCREMENTAL_TO),
INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
} else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) {
checkExactOneOptionExistInMode(
options,
options.startupMode(),
INCREMENTAL_BETWEEN,
INCREMENTAL_BETWEEN_TIMESTAMP);
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_TO_AUTO_TAG);
checkOptionsConflict(
options,
Arrays.asList(
SCAN_SNAPSHOT_ID,
SCAN_TIMESTAMP_MILLIS,
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TIMESTAMP,
SCAN_TAG_NAME,
INCREMENTAL_TO),
Arrays.asList(INCREMENTAL_BETWEEN, INCREMENTAL_BETWEEN_TIMESTAMP));
} else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) {
checkExactOneOptionExistInMode(options, options.startupMode(), INCREMENTAL_TO);
checkOptionsConflict(
options,
SCAN_TAG_NAME),
Arrays.asList(
SCAN_SNAPSHOT_ID,
SCAN_TIMESTAMP_MILLIS,
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TIMESTAMP,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN,
INCREMENTAL_BETWEEN_TIMESTAMP),
Collections.singletonList(INCREMENTAL_TO));
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_TO_AUTO_TAG));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(
Expand All @@ -339,7 +329,7 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN,
INCREMENTAL_TO),
INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) {
checkOptionExistInMode(
Expand All @@ -354,7 +344,7 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
INCREMENTAL_BETWEEN,
INCREMENTAL_TO),
INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
Expand All @@ -366,7 +356,7 @@ private static void validateStartupMode(CoreOptions options) {
checkOptionNotExistInMode(
options, INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_TO, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_TO_AUTO_TAG, options.startupMode());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
Expand Down Expand Up @@ -199,7 +200,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
: new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
Pair<String, String> incrementalBetween = options.incrementalBetween();
CoreOptions.IncrementalBetweenScanMode scanType =
options.incrementalBetweenScanMode();
ScanMode scanMode;
Expand All @@ -220,7 +220,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
throw new UnsupportedOperationException(
"Unknown incremental scan type " + scanType.name());
}
if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {

Options conf = options.toConfiguration();
if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
Pair<String, String> incrementalBetween = options.incrementalBetween();
try {
return new IncrementalStartingScanner(
snapshotManager,
Expand All @@ -233,17 +236,19 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
incrementalBetween.getLeft(),
incrementalBetween.getRight());
}
} else {
} else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
Pair<String, String> incrementalBetween = options.incrementalBetween();
return new IncrementalTimeStampStartingScanner(
snapshotManager,
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()),
scanMode);
} else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
String endTag = options.incrementalToAutoTag();
return IncrementalTagStartingScanner.create(snapshotManager, endTag, options);
} else {
throw new UnsupportedOperationException("Unknown incremental read mode.");
}
case INCREMENTAL_TO_AUTO_TAG:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
String endTag = options.incrementalTo();
return IncrementalTagStartingScanner.create(snapshotManager, endTag, options);
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static AbstractStartingScanner create(
checkNotNull(
extractor,
"Table's tag creation mode doesn't support '%s' scan mode.",
CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG);
CoreOptions.INCREMENTAL_TO_AUTO_TAG);
TagPeriodHandler periodHandler = TagPeriodHandler.create(options);
checkArgument(
periodHandler.isAutoTag(endTagName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.List;

import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -360,11 +360,11 @@ public void testIncrementalToAutoTag() throws Exception {

assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", "2024-12-02", "2024-12-04");

assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-01"))).isEmpty();
assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-02")))
assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-01"))).isEmpty();
assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02")))
.containsExactly(GenericRow.of(2, BinaryString.fromString("b")));
assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-03"))).isEmpty();
assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-04")))
assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-03"))).isEmpty();
assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-04")))
.containsExactly(GenericRow.of(3, BinaryString.fromString("c")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,12 +984,9 @@ private static Stream<Map<String, String>> optionProvider(boolean isStreaming) {
options.put(SCAN_FILE_CREATION_TIME_MILLIS.key(), System.currentTimeMillis() + "");
} else if (mode == CoreOptions.StartupMode.INCREMENTAL) {
options.put("incremental-between", "2,5");
} else if (mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) {
options.put("incremental-to", "2024-12-01");
}

if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL
|| mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) {
if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL) {
continue;
}
allOptions.add(options);
Expand Down

0 comments on commit fe19324

Please sign in to comment.