Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: add a new task-type field to task JSON serialization. add data task JSON serialization implementation. #9728

Merged
merged 1 commit into from
Jun 26, 2024

Conversation

stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Feb 14, 2024

issue #9597

Java implementation only adds serialization for StaticDataTask. Other task types should be added in a follow-up PR , which is ready in my fork. with the additional parsers, TestFlinkMetaDataTable can pass with FLIP-27 source using JSON serializers.
stevenzwu@59445b7

@github-actions github-actions bot added the Specification Issues that may introduce spec changes. label Feb 14, 2024
@stevenzwu stevenzwu changed the title Spec: add task-type field to JSON serialization of file scan task. add JSON serialization for StaticDataTask. Spec: add a new task-type field to task JSON serialization. add data task JSON serialization spec. Feb 16, 2024
@github-actions github-actions bot added the core label Feb 16, 2024
@stevenzwu stevenzwu changed the title Spec: add a new task-type field to task JSON serialization. add data task JSON serialization spec. Spec, Core: add a new task-type field to task JSON serialization. add data task JSON serialization spec and imp. Feb 16, 2024
private static final String RESIDUAL = "residual-filter";
private static final String TASK_TYPE = "task-type";

private enum TaskType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the Java class names are starting to get confusing, even at the interface level. Although DataTask extends FileScanTask, there is really no file that the DataTask is scanning. So although StaticDataTask and BaseFileScanTask both implements FileScanTask, they are not really conceptually similar. The actual relationship is more like StaticDataTask implements DataTask, BaseFileScanTask implements FileScanTask, but DataTask and FileScanTask have many things in common so we made DataTask extends FileScanTask.

To resolve this confusing situation and potentially accommodate other types of scan task, I am thinking if we should go one more layer above, so we keep the file scan task spec as is, and on top of that, have a serialization spec for scan task, which has types like file-scan-task, data-task. And we then have ScanTaskParser that delegates to the existing FileScanTaskParser or the DataTaskParser. What do you think?

Copy link
Contributor Author

@stevenzwu stevenzwu Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 I agree that ScanTaskParser is more accurate for the facade/dispatcher class here. The only problem is that FileScanTaskParser is a public class in core module. Technically, we are breaking compatibility.

maybe we can keep FileScanTaskParser and mark it as deprecated. it can just extend from the new ScanTaskParser to avoid code duplication. But it means that we need to keep the file task parser name as BaseFileScanTaskParser, which is package private anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we can just deprecate the public methods from FileScanTaskParser and still keep the file scan task impl here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class name refactoring is done.

format/spec.md Outdated
| **`data-file`** |`JSON object`|`See above, read content file instead`|
| **`delete-files`** |`JSON list of objects`|`See above, read content file instead`|
| **`residual-filter`** |`JSON object: residual filter expression`|`{"type":"eq","term":"id","value":1}`|
### Task Serialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its really not clear to me why file scan task is added into the spec here? I don't think it is referenced any place in the main body of the specification?

Copy link
Contributor Author

@stevenzwu stevenzwu Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield that is a fair question. @rdblue also had a question if this should be added to REST OpenAPI. My reservation is that this is also used by Flink for state serialization. Hence it is not just a REST thing.

what do others think? @rdblue @jackye1995 @aokolnychyi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What state is being serialized? How does it relate to reading tables?

Does the flink use-case require standardization here or is it an implementation detail of flink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink source checkpoints pending splits of FileScanTask. The standardization of JSON serialization scan task is used by both REST OpenAPI and Flink (checkpoint and job manager -> task manager split assignment). I would imagine if Spark streaming checkpoint pending splits (scan task), it would probably also prefer to use JSON serialization (than Java serialization).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, unless we expect Spark to read the Rest checkpoints from Flink (I would guess this isn't the case), then this is really an implementation detail of both engines and doesn't really belong in the specification.

Copy link
Contributor

@rdblue rdblue Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely prefer it if this were not part of the spec. I don't think there is any reason to require a specific JSON serialization and I was surprised to see it in the spec. It's great to have documentation on exactly what the parsers produce, but we have many parsers that are not covered by the table spec and are instead in other documents like the Puffin spec, View spec, or REST spec.

To me, state serialization is a concern internal to Flink. It's harder to adhere to a spec for that, plus make guarantees about forward and backward compatibility. And without context for how this is used and why it is here, we can't make decisions about how to evolve this. For example, if you wanted to remove a field that Flink doesn't use, how do you know whether that is safe in the table spec? What does it mean for this to evolve "safely"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. @rdblue should I just remove this section from the table spec? not sure what's the policy of removing an invalid section from spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, I have reverted the spec change. we still need to decide if/how we can remove the existing spec section on file scan task JSON serialization.

@github-actions github-actions bot added the flink label Feb 16, 2024
@stevenzwu stevenzwu force-pushed the issue-9597 branch 2 times, most recently from c27b5d5 to 4c7c907 Compare February 20, 2024 01:16
@stevenzwu stevenzwu changed the title Spec, Core: add a new task-type field to task JSON serialization. add data task JSON serialization spec and imp. Core: add a new task-type field to task JSON serialization. add data task JSON serialization spec and imp. Feb 20, 2024
@stevenzwu stevenzwu changed the title Core: add a new task-type field to task JSON serialization. add data task JSON serialization spec and imp. Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. Mar 13, 2024
@stevenzwu
Copy link
Contributor Author

@nastra @pvary can you help review? spec change/revert has been moved to a separate PR: https://github.com/apache/iceberg/pull/9771/files.

This is blocking Flink moving to the FLIP-27 source as default. will need another PR to complete the metadata queries support.

@nastra
Copy link
Contributor

nastra commented Mar 14, 2024

@stevenzwu I'll take a look either tomorrow or early next week

this.value = value;
}

public static TaskType fromValue(String value) {
Copy link
Contributor

@nastra nastra Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the codebase typically names this fromString() or fromName(). Given that this looks specifically for the task type name, maybe this should be named fromTypeName()?

}
}

public String value() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe typeName()?

private static final String TASK_TYPE = "task-type";

private enum TaskType {
FILE_SCAN_TASK("file-scan-task"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should have an UNKNOWN for forward/backward compatibility. Imagine a client/server that use different Iceberg versions and new task types are being added over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my reply in the comment below

} else if (DATA_TASK.value().equalsIgnoreCase(value)) {
return DATA_TASK;
} else {
throw new IllegalArgumentException("Unknown task type: " + value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably shouldn't fail but rather return UNKNOWN. See also #7145 where a similar issue has been addressed and I think we need to do the same thing here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked out PR #7145 . I am not sure it has the same applicability here. report metric may be considered optional and hence return an unknown report metrics object might be desirable there. but here if a scan task is unknown and can't be parsed properly, we should fail explicitly.

generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
FileScanTaskParser.toJson(fileScanTask, generator);
} else {
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should fail for the same reason I mentioned further above. If you e.g. take a look at the ReportMetricsRequestParser, then it doesn't fail if if sees an unknown type and we would need to do the same handling here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my reply in the comment above

.as("Schema should match")
.isTrue();

Assertions.assertThat(expected.projectedSchema().sameSchema(actual.projectedSchema()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@@ -27,29 +27,41 @@
import org.junit.jupiter.params.provider.ValueSource;

public class TestFileScanTaskParser {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change

@Test
public void testNullArguments() {
Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null))
Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test class should actually change at all. Since you added a new ScanTaskParser, it's best to also add a TestScanTaskParser. This test should stay the same, since someone could still use the Parser and the existing behavior of the parser shouldn't change IMO

Copy link
Contributor Author

@stevenzwu stevenzwu Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point here. Will add a TestScanTaskParser to test the facade part, like task type field and unsupported task type.

You pointed out another issue in this test class. we should test both the old/deprecated public API (via FileScanTaskParser.toJson/fromJson) and new ScanTaskParser public API.

But this class still need to be changed as we are not going to add FileScanTask testing in the TestScanTaskParser facade test class.

@@ -27,7 +28,8 @@
* <p>This does not include snapshots that have been expired using {@link ExpireSnapshots}.
*/
public class SnapshotsTable extends BaseMetadataTable {
private static final Schema SNAPSHOT_SCHEMA =
@VisibleForTesting
static final Schema SNAPSHOT_SCHEMA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we avoid making this visible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was testing a real StaticDataTask for snapshot metadata table/rows. Hence expose this as package private. Note that this is not public though.

We can avoid making this visible by define a custom test schema and custom static data task in the TestDataTaskParser.

let me know your thoughts/preference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a custom schema for the test makes sense here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied the SnapshotsTable schema to the test class.

@stevenzwu stevenzwu force-pushed the issue-9597 branch 2 times, most recently from e4ba92a to 557ff4e Compare March 15, 2024 18:10
@stevenzwu
Copy link
Contributor Author

@nastra can you help take another look?

private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) {
Assertions.assertThat(expected.schema().asStruct())
.isEqualTo(actual.schema().asStruct())
.as("Schema should match");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.as() needs to come before .isEqualTo(). Same for all the other places in this PR

ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.reader().readTree(jsonStr);

Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for new tests it's ok to statically import assertThatThrownBy() / assertThat()


@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScanTaskParser(boolean caseSensitive) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho this test should live in TestScanTaskParser as it's focusing on the ScanTaskParser. Same for testScanTaskParserWithoutTaskTypeField

Copy link
Contributor Author

@stevenzwu stevenzwu Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScanTaskParser is the facade/dispatch class. I don't want to add all task types testing into a single gigantic test class TestScanTaskParser. I think it is a bit cleaner to have one test class for each task type (file, data, manifest etc.).

.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON string for file scan task: null");

Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in TestScanTaskParser as it's using ScanTaskParser

}

private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) {
assertThat(expected.schema().asStruct())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual/expected is the wrong way. Should be assertThat(actual...)...isEqualTo(expected...). Same for the other assertions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching the mistake

.isTrue();
Assertions.assertThat(actual.spec()).isEqualTo(expected.spec());
Assertions.assertThat(
assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue();
assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct());

if the assertion fails, then this will show where the schema mismatch is


List<StructLike> expectedRows = Lists.newArrayList(expected.rows());
List<StructLike> actualRows = Lists.newArrayList(actual.rows());
assertThat(actualRows).hasSize(expectedRows.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(actualRows).hasSize(expectedRows.size());
assertThat(actualRows).hasSameSizeAs(expectedRows);

@stevenzwu stevenzwu requested a review from nastra May 7, 2024 16:35
@stevenzwu stevenzwu changed the title Core: add a new task-type field to task JSON serialization. add data task JSON serialization imp. Core: add a new task-type field to task JSON serialization. add data task JSON serialization implementation. Jun 18, 2024
@stevenzwu stevenzwu force-pushed the issue-9597 branch 2 times, most recently from 4ef2fb9 to be97ade Compare June 19, 2024 00:20
private static final String METADATA_FILE = "metadata-file";
private static final String ROWS = "rows";

private DataTaskParser() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why is this not StaticDataTaskParser?
For me the natural thing would be to have a 1-on-1 connection between the 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see the earlier comment from Jack: #9728 (comment)

@nastra nastra merged commit 9ed3383 into apache:main Jun 26, 2024
41 checks passed
jasonf20 pushed a commit to jasonf20/iceberg that referenced this pull request Aug 4, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core flink Specification Issues that may introduce spec changes.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants