-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: add a new task-type field to task JSON serialization. add data task JSON serialization implementation. #9728
Conversation
core/src/main/java/org/apache/iceberg/BaseFileScanTaskParser.java
Outdated
Show resolved
Hide resolved
41b9255
to
bbdef70
Compare
private static final String RESIDUAL = "residual-filter"; | ||
private static final String TASK_TYPE = "task-type"; | ||
|
||
private enum TaskType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the Java class names are starting to get confusing, even at the interface level. Although DataTask
extends FileScanTask
, there is really no file that the DataTask
is scanning. So although StaticDataTask
and BaseFileScanTask
both implements FileScanTask
, they are not really conceptually similar. The actual relationship is more like StaticDataTask
implements DataTask
, BaseFileScanTask
implements FileScanTask
, but DataTask
and FileScanTask
have many things in common so we made DataTask
extends FileScanTask
.
To resolve this confusing situation and potentially accommodate other types of scan task, I am thinking if we should go one more layer above, so we keep the file scan task spec as is, and on top of that, have a serialization spec for scan task, which has types like file-scan-task
, data-task
. And we then have ScanTaskParser
that delegates to the existing FileScanTaskParser
or the DataTaskParser
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 I agree that ScanTaskParser
is more accurate for the facade/dispatcher class here. The only problem is that FileScanTaskParser
is a public class in core module. Technically, we are breaking compatibility.
maybe we can keep FileScanTaskParser
and mark it as deprecated. it can just extend from the new ScanTaskParser
to avoid code duplication. But it means that we need to keep the file task parser name as BaseFileScanTaskParser
, which is package private anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we can just deprecate the public methods from FileScanTaskParser
and still keep the file scan task impl here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class name refactoring is done.
format/spec.md
Outdated
| **`data-file`** |`JSON object`|`See above, read content file instead`| | ||
| **`delete-files`** |`JSON list of objects`|`See above, read content file instead`| | ||
| **`residual-filter`** |`JSON object: residual filter expression`|`{"type":"eq","term":"id","value":1}`| | ||
### Task Serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its really not clear to me why file scan task is added into the spec here? I don't think it is referenced any place in the main body of the specification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@emkornfield that is a fair question. @rdblue also had a question if this should be added to REST OpenAPI. My reservation is that this is also used by Flink for state serialization. Hence it is not just a REST thing.
what do others think? @rdblue @jackye1995 @aokolnychyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What state is being serialized? How does it relate to reading tables?
Does the flink use-case require standardization here or is it an implementation detail of flink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink source checkpoints pending splits of FileScanTask
. The standardization of JSON serialization scan task is used by both REST OpenAPI and Flink (checkpoint and job manager -> task manager split assignment). I would imagine if Spark streaming checkpoint pending splits (scan task), it would probably also prefer to use JSON serialization (than Java serialization).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, unless we expect Spark to read the Rest checkpoints from Flink (I would guess this isn't the case), then this is really an implementation detail of both engines and doesn't really belong in the specification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would definitely prefer it if this were not part of the spec. I don't think there is any reason to require a specific JSON serialization and I was surprised to see it in the spec. It's great to have documentation on exactly what the parsers produce, but we have many parsers that are not covered by the table spec and are instead in other documents like the Puffin spec, View spec, or REST spec.
To me, state serialization is a concern internal to Flink. It's harder to adhere to a spec for that, plus make guarantees about forward and backward compatibility. And without context for how this is used and why it is here, we can't make decisions about how to evolve this. For example, if you wanted to remove a field that Flink doesn't use, how do you know whether that is safe in the table spec? What does it mean for this to evolve "safely"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. @rdblue should I just remove this section from the table spec? not sure what's the policy of removing an invalid section from spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, I have reverted the spec change. we still need to decide if/how we can remove the existing spec section on file scan task JSON serialization.
28d8758
to
fc703bb
Compare
c27b5d5
to
4c7c907
Compare
c3a9443
to
b8b581d
Compare
@nastra @pvary can you help review? spec change/revert has been moved to a separate PR: https://github.com/apache/iceberg/pull/9771/files. This is blocking Flink moving to the FLIP-27 source as default. will need another PR to complete the metadata queries support. |
@stevenzwu I'll take a look either tomorrow or early next week |
this.value = value; | ||
} | ||
|
||
public static TaskType fromValue(String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the codebase typically names this fromString()
or fromName()
. Given that this looks specifically for the task type name, maybe this should be named fromTypeName()
?
} | ||
} | ||
|
||
public String value() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe typeName()
?
private static final String TASK_TYPE = "task-type"; | ||
|
||
private enum TaskType { | ||
FILE_SCAN_TASK("file-scan-task"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should have an UNKNOWN
for forward/backward compatibility. Imagine a client/server that use different Iceberg versions and new task types are being added over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my reply in the comment below
} else if (DATA_TASK.value().equalsIgnoreCase(value)) { | ||
return DATA_TASK; | ||
} else { | ||
throw new IllegalArgumentException("Unknown task type: " + value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably shouldn't fail but rather return UNKNOWN
. See also #7145 where a similar issue has been addressed and I think we need to do the same thing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked out PR #7145 . I am not sure it has the same applicability here. report metric may be considered optional
and hence return an unknown report metrics object might be desirable there. but here if a scan task is unknown and can't be parsed properly, we should fail explicitly.
generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value()); | ||
FileScanTaskParser.toJson(fileScanTask, generator); | ||
} else { | ||
throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should fail for the same reason I mentioned further above. If you e.g. take a look at the ReportMetricsRequestParser
, then it doesn't fail if if sees an unknown type and we would need to do the same handling here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my reply in the comment above
.as("Schema should match") | ||
.isTrue(); | ||
|
||
Assertions.assertThat(expected.projectedSchema().sameSchema(actual.projectedSchema())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -27,29 +27,41 @@ | |||
import org.junit.jupiter.params.provider.ValueSource; | |||
|
|||
public class TestFileScanTaskParser { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change
@Test | ||
public void testNullArguments() { | ||
Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null)) | ||
Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test class should actually change at all. Since you added a new ScanTaskParser
, it's best to also add a TestScanTaskParser
. This test should stay the same, since someone could still use the Parser and the existing behavior of the parser shouldn't change IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point here. Will add a TestScanTaskParser
to test the facade part, like task type field and unsupported task type.
You pointed out another issue in this test class. we should test both the old/deprecated public API (via FileScanTaskParser.toJson/fromJson
) and new ScanTaskParser
public API.
But this class still need to be changed as we are not going to add FileScanTask
testing in the TestScanTaskParser
facade test class.
@@ -27,7 +28,8 @@ | |||
* <p>This does not include snapshots that have been expired using {@link ExpireSnapshots}. | |||
*/ | |||
public class SnapshotsTable extends BaseMetadataTable { | |||
private static final Schema SNAPSHOT_SCHEMA = | |||
@VisibleForTesting | |||
static final Schema SNAPSHOT_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we avoid making this visible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was testing a real StaticDataTask
for snapshot metadata table/rows. Hence expose this as package private. Note that this is not public
though.
We can avoid making this visible by define a custom test schema and custom static data task in the TestDataTaskParser
.
let me know your thoughts/preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having a custom schema for the test makes sense here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied the SnapshotsTable schema to the test class.
e4ba92a
to
557ff4e
Compare
@nastra can you help take another look? |
private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) { | ||
Assertions.assertThat(expected.schema().asStruct()) | ||
.isEqualTo(actual.schema().asStruct()) | ||
.as("Schema should match"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.as()
needs to come before .isEqualTo()
. Same for all the other places in this PR
ObjectMapper mapper = new ObjectMapper(); | ||
JsonNode rootNode = mapper.reader().readTree(jsonStr); | ||
|
||
Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for new tests it's ok to statically import assertThatThrownBy()
/ assertThat()
|
||
@ParameterizedTest | ||
@ValueSource(booleans = {true, false}) | ||
public void testScanTaskParser(boolean caseSensitive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imho this test should live in TestScanTaskParser
as it's focusing on the ScanTaskParser
. Same for testScanTaskParserWithoutTaskTypeField
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ScanTaskParser
is the facade/dispatch class. I don't want to add all task types testing into a single gigantic test class TestScanTaskParser
. I think it is a bit cleaner to have one test class for each task type (file, data, manifest etc.).
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage("Invalid JSON string for file scan task: null"); | ||
|
||
Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in TestScanTaskParser
as it's using ScanTaskParser
} | ||
|
||
private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) { | ||
assertThat(expected.schema().asStruct()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actual/expected is the wrong way. Should be assertThat(actual...)...isEqualTo(expected...)
. Same for the other assertions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for catching the mistake
.isTrue(); | ||
Assertions.assertThat(actual.spec()).isEqualTo(expected.spec()); | ||
Assertions.assertThat( | ||
assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); | |
assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct()); |
if the assertion fails, then this will show where the schema mismatch is
|
||
List<StructLike> expectedRows = Lists.newArrayList(expected.rows()); | ||
List<StructLike> actualRows = Lists.newArrayList(actual.rows()); | ||
assertThat(actualRows).hasSize(expectedRows.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(actualRows).hasSize(expectedRows.size()); | |
assertThat(actualRows).hasSameSizeAs(expectedRows); |
4ef2fb9
to
be97ade
Compare
…add JSON serialization for StaticDataTask.
private static final String METADATA_FILE = "metadata-file"; | ||
private static final String ROWS = "rows"; | ||
|
||
private DataTaskParser() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why is this not StaticDataTaskParser
?
For me the natural thing would be to have a 1-on-1 connection between the 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please see the earlier comment from Jack: #9728 (comment)
…N serde for StaticDataTask. (apache#9728)
…N serde for StaticDataTask. (apache#9728)
issue #9597
Java implementation only adds serialization for
StaticDataTask
. Other task types should be added in a follow-up PR , which is ready in my fork. with the additional parsers,TestFlinkMetaDataTable
can pass with FLIP-27 source using JSON serializers.stevenzwu@59445b7