From 9ed33839e7a8820b5069cbb51cd54a80785d6a62 Mon Sep 17 00:00:00 2001 From: Steven Zhen Wu Date: Wed, 26 Jun 2024 16:37:37 +0800 Subject: [PATCH] Core, Flink: Add task-type field to JSON serde of scan task / Add JSON serde for StaticDataTask. (#9728) --- .../org/apache/iceberg/DataTaskParser.java | 81 ++++++ .../apache/iceberg/FileScanTaskParser.java | 39 ++- .../org/apache/iceberg/ScanTaskParser.java | 105 +++++++ .../org/apache/iceberg/StaticDataTask.java | 26 ++ .../apache/iceberg/TestDataTaskParser.java | 274 ++++++++++++++++++ .../iceberg/TestFileScanTaskParser.java | 75 ++++- .../apache/iceberg/TestScanTaskParser.java | 54 ++++ .../source/split/IcebergSourceSplit.java | 6 +- .../source/split/IcebergSourceSplit.java | 6 +- .../source/split/IcebergSourceSplit.java | 6 +- 10 files changed, 644 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/DataTaskParser.java create mode 100644 core/src/main/java/org/apache/iceberg/ScanTaskParser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestDataTaskParser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestScanTaskParser.java diff --git a/core/src/main/java/org/apache/iceberg/DataTaskParser.java b/core/src/main/java/org/apache/iceberg/DataTaskParser.java new file mode 100644 index 000000000000..428bcf15e7e2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DataTaskParser.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +class DataTaskParser { + private static final String SCHEMA = "schema"; + private static final String PROJECTED_SCHEMA = "projection"; + private static final String METADATA_FILE = "metadata-file"; + private static final String ROWS = "rows"; + + private DataTaskParser() {} + + static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws IOException { + Preconditions.checkArgument(dataTask != null, "Invalid data task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(dataTask.schema(), generator); + + generator.writeFieldName(PROJECTED_SCHEMA); + SchemaParser.toJson(dataTask.projectedSchema(), generator); + + generator.writeFieldName(METADATA_FILE); + ContentFileParser.toJson(dataTask.metadataFile(), PartitionSpec.unpartitioned(), generator); + + Preconditions.checkArgument(dataTask.tableRows() != null, "Invalid data task: null table rows"); + generator.writeArrayFieldStart(ROWS); + for (StructLike row : dataTask.tableRows()) { + SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator); + } + + generator.writeEndArray(); + } + + static StaticDataTask fromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for data task: non-object (%s)", jsonNode); + + Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode)); + Schema projectedSchema = SchemaParser.fromJson(JsonUtil.get(PROJECTED_SCHEMA, jsonNode)); + DataFile metadataFile = + (DataFile) + ContentFileParser.fromJson( + JsonUtil.get(METADATA_FILE, jsonNode), PartitionSpec.unpartitioned()); + + JsonNode rowsArray = JsonUtil.get(ROWS, jsonNode); + Preconditions.checkArgument( + rowsArray.isArray(), "Invalid JSON node for rows: non-array (%s)", rowsArray); + + StructLike[] rows = new StructLike[rowsArray.size()]; + for (int i = 0; i < rowsArray.size(); ++i) { + JsonNode rowNode = rowsArray.get(i); + rows[i] = (StructLike) SingleValueParser.fromJson(schema.asStruct(), rowNode); + } + + return new StaticDataTask(metadataFile, schema, projectedSchema, rows); + } +} diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java index 0a708f2668cc..a6ea41319f4e 100644 --- a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java +++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java @@ -40,16 +40,38 @@ public class FileScanTaskParser { private FileScanTaskParser() {} + /** + * Serialize file scan task to JSON string + * + * @deprecated will be removed in 1.7.0; use {@link ScanTaskParser#toJson(FileScanTask)} instead + */ + @Deprecated public static String toJson(FileScanTask fileScanTask) { + Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); return JsonUtil.generate( - generator -> FileScanTaskParser.toJson(fileScanTask, generator), false); + generator -> { + generator.writeStartObject(); + toJson(fileScanTask, generator); + generator.writeEndObject(); + }, + false); + } + + /** + * Deserialize file scan task from JSON string + * + * @deprecated will be removed in 1.7.0; use {@link ScanTaskParser#fromJson(String, boolean)} + * instead + */ + @Deprecated + public static FileScanTask fromJson(String json, boolean caseSensitive) { + Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null"); + return JsonUtil.parse(json, node -> fromJson(node, caseSensitive)); } - private static void toJson(FileScanTask fileScanTask, JsonGenerator generator) - throws IOException { + static void toJson(FileScanTask fileScanTask, JsonGenerator generator) throws IOException { Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null"); Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); - generator.writeStartObject(); generator.writeFieldName(SCHEMA); SchemaParser.toJson(fileScanTask.schema(), generator); @@ -78,16 +100,9 @@ private static void toJson(FileScanTask fileScanTask, JsonGenerator generator) generator.writeFieldName(RESIDUAL); ExpressionParser.toJson(fileScanTask.residual(), generator); } - - generator.writeEndObject(); - } - - public static FileScanTask fromJson(String json, boolean caseSensitive) { - Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null"); - return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node, caseSensitive)); } - private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) { + static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) { Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null"); Preconditions.checkArgument( jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode); diff --git a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java new file mode 100644 index 000000000000..9447d0668a1f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.util.JsonUtil; + +public class ScanTaskParser { + private static final String TASK_TYPE = "task-type"; + + private enum TaskType { + FILE_SCAN_TASK("file-scan-task"), + DATA_TASK("data-task"); + + private final String value; + + TaskType(String value) { + this.value = value; + } + + public static TaskType fromTypeName(String value) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(value), "Invalid task type name: null or empty"); + if (FILE_SCAN_TASK.typeName().equalsIgnoreCase(value)) { + return FILE_SCAN_TASK; + } else if (DATA_TASK.typeName().equalsIgnoreCase(value)) { + return DATA_TASK; + } else { + throw new IllegalArgumentException("Unknown task type: " + value); + } + } + + public String typeName() { + return value; + } + } + + private ScanTaskParser() {} + + public static String toJson(FileScanTask fileScanTask) { + Preconditions.checkArgument(fileScanTask != null, "Invalid scan task: null"); + return JsonUtil.generate(generator -> toJson(fileScanTask, generator), false); + } + + public static FileScanTask fromJson(String json, boolean caseSensitive) { + Preconditions.checkArgument(json != null, "Invalid JSON string for scan task: null"); + return JsonUtil.parse(json, node -> fromJson(node, caseSensitive)); + } + + private static void toJson(FileScanTask fileScanTask, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + + if (fileScanTask instanceof StaticDataTask) { + generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName()); + DataTaskParser.toJson((StaticDataTask) fileScanTask, generator); + } else if (fileScanTask instanceof BaseFileScanTask + || fileScanTask instanceof BaseFileScanTask.SplitScanTask) { + generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.typeName()); + FileScanTaskParser.toJson(fileScanTask, generator); + } else { + throw new UnsupportedOperationException( + "Unsupported task type: " + fileScanTask.getClass().getCanonicalName()); + } + + generator.writeEndObject(); + } + + private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) { + TaskType taskType = TaskType.FILE_SCAN_TASK; + String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode); + if (null != taskTypeStr) { + taskType = TaskType.fromTypeName(taskTypeStr); + } + + switch (taskType) { + case FILE_SCAN_TASK: + return FileScanTaskParser.fromJson(jsonNode, caseSensitive); + case DATA_TASK: + return DataTaskParser.fromJson(jsonNode); + default: + throw new UnsupportedOperationException("Unsupported task type: " + taskType.typeName()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java index cffb42427960..f25ebd49c9d8 100644 --- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java +++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java @@ -64,6 +64,19 @@ private StaticDataTask( this.rows = rows; } + StaticDataTask( + DataFile metadataFile, Schema tableSchema, Schema projectedSchema, StructLike[] rows) { + this.tableSchema = tableSchema; + this.projectedSchema = projectedSchema; + this.metadataFile = metadataFile; + this.rows = rows; + } + + @Override + public Schema schema() { + return tableSchema; + } + @Override public List deletes() { return ImmutableList.of(); @@ -106,6 +119,19 @@ public Iterable split(long splitSize) { return ImmutableList.of(this); } + Schema projectedSchema() { + return projectedSchema; + } + + DataFile metadataFile() { + return metadataFile; + } + + /** @return the table rows before projection */ + StructLike[] tableRows() { + return rows; + } + /** Implements {@link StructLike#get} for passing static rows. */ static class Row implements StructLike, Serializable { public static Row of(Object... values) { diff --git a/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java new file mode 100644 index 000000000000..5a3d119046f5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; +import org.junit.jupiter.api.Test; + +public class TestDataTaskParser { + // copied from SnapshotsTable to avoid making it package public + private static final Schema SNAPSHOT_SCHEMA = + new Schema( + Types.NestedField.required(1, "committed_at", Types.TimestampType.withZone()), + Types.NestedField.required(2, "snapshot_id", Types.LongType.get()), + Types.NestedField.optional(3, "parent_id", Types.LongType.get()), + Types.NestedField.optional(4, "operation", Types.StringType.get()), + Types.NestedField.optional(5, "manifest_list", Types.StringType.get()), + Types.NestedField.optional( + 6, + "summary", + Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.StringType.get()))); + + // copied from SnapshotsTable to avoid making it package public + private static StaticDataTask.Row snapshotToRow(Snapshot snap) { + return StaticDataTask.Row.of( + snap.timestampMillis() * 1000, + snap.snapshotId(), + snap.parentId(), + snap.operation(), + snap.manifestListLocation(), + snap.summary()); + } + + @Test + public void nullCheck() throws Exception { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + + assertThatThrownBy(() -> DataTaskParser.toJson(null, generator)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid data task: null"); + + assertThatThrownBy(() -> DataTaskParser.toJson((StaticDataTask) createDataTask(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid JSON generator: null"); + + assertThatThrownBy(() -> DataTaskParser.fromJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid JSON node for data task: null"); + } + + @Test + public void invalidJsonNode() throws Exception { + String jsonStr = "{\"str\":\"1\", \"arr\":[]}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.reader().readTree(jsonStr); + + assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid JSON node for data task: non-object "); + + assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid JSON node for data task: non-object "); + } + + @Test + public void missingFields() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + String missingSchemaStr = "{}"; + JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr); + assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot parse missing field: schema"); + + String missingProjectionStr = + "{" + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" + + "}"; + JsonNode missingProjectionNode = mapper.reader().readTree(missingProjectionStr); + assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot parse missing field: projection"); + + String missingMetadataFileStr = + "{" + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}," + + "\"projection\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}" + + "}"; + JsonNode missingMetadataFileNode = mapper.reader().readTree(missingMetadataFileStr); + assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot parse missing field: metadata-file"); + + String missingTableRowsStr = + "{\"task-type\":\"data-task\"," + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}," + + "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"}," + + "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"}," + + "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\"," + + "\"key-id\":7,\"key\":\"string\",\"value-id\":8," + + "\"value\":\"string\",\"value-required\":true}}]}," + + "\"projection\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}," + + "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"}," + + "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"}," + + "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\"," + + "\"key-id\":7,\"key\":\"string\",\"value-id\":8," + + "\"value\":\"string\",\"value-required\":true}}]}," + + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\"," + + "\"file-path\":\"/tmp/metadata2.json\"," + + "\"file-format\":\"METADATA\",\"partition\":{}," + + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}" + + "}"; + JsonNode missingTableRowsNode = mapper.reader().readTree(missingTableRowsStr); + assertThatThrownBy(() -> DataTaskParser.fromJson(missingTableRowsNode)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot parse missing field: rows"); + } + + @Test + public void roundTripSerde() { + StaticDataTask dataTask = (StaticDataTask) createDataTask(); + String jsonStr = ScanTaskParser.toJson(dataTask); + assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson()); + StaticDataTask deserializedTask = (StaticDataTask) ScanTaskParser.fromJson(jsonStr, true); + assertDataTaskEquals(dataTask, deserializedTask); + } + + private DataTask createDataTask() { + Map summary1 = + ImmutableMap.of( + "added-data-files", "1", + "added-records", "1", + "added-files-size", "10", + "changed-partition-count", "1", + "total-records", "1", + "total-files-size", "10", + "total-data-files", "1", + "total-delete-files", "0", + "total-position-deletes", "0", + "total-equality-deletes", "0"); + + Map summary2 = + ImmutableMap.of( + "added-data-files", "1", + "added-records", "1", + "added-files-size", "10", + "changed-partition-count", "1", + "total-records", "2", + "total-files-size", "20", + "total-data-files", "2", + "total-delete-files", "0", + "total-position-deletes", "0", + "total-equality-deletes", "0"); + + List snapshots = + Arrays.asList( + new BaseSnapshot( + 1L, 1L, null, 1234567890000L, "append", summary1, 1, "file:/tmp/manifest1.avro"), + new BaseSnapshot( + 2L, 2L, 1L, 9876543210000L, "append", summary2, 1, "file:/tmp/manifest2.avro")); + + return StaticDataTask.of( + Files.localInput("file:/tmp/metadata2.json"), + SNAPSHOT_SCHEMA, + SNAPSHOT_SCHEMA, + snapshots, + TestDataTaskParser::snapshotToRow); + } + + private String snapshotsDataTaskJson() { + return "{\"task-type\":\"data-task\"," + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}," + + "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"}," + + "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"}," + + "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\"," + + "\"key-id\":7,\"key\":\"string\",\"value-id\":8," + + "\"value\":\"string\",\"value-required\":true}}]}," + + "\"projection\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}," + + "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"}," + + "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"}," + + "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\"," + + "\"key-id\":7,\"key\":\"string\",\"value-id\":8," + + "\"value\":\"string\",\"value-required\":true}}]}," + + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\"," + + "\"file-path\":\"/tmp/metadata2.json\"," + + "\"file-format\":\"METADATA\",\"partition\":{}," + + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}," + + "\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\"," + + "\"5\":\"file:/tmp/manifest1.avro\"," + + "\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\"," + + "\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\"," + + "\"total-position-deletes\",\"total-equality-deletes\"]," + + "\"values\":[\"1\",\"1\",\"10\",\"1\",\"1\",\"10\",\"1\",\"0\",\"0\",\"0\"]}}," + + "{\"1\":\"2282-12-22T20:13:30+00:00\",\"2\":2,\"3\":1,\"4\":\"append\"," + + "\"5\":\"file:/tmp/manifest2.avro\"," + + "\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\"," + + "\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\"," + + "\"total-position-deletes\",\"total-equality-deletes\"]," + + "\"values\":[\"1\",\"1\",\"10\",\"1\",\"2\",\"20\",\"2\",\"0\",\"0\",\"0\"]}}]}"; + } + + private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) { + assertThat(actual.schema().asStruct()) + .as("Schema should match") + .isEqualTo(expected.schema().asStruct()); + + assertThat(actual.projectedSchema().asStruct()) + .as("Projected schema should match") + .isEqualTo(expected.projectedSchema().asStruct()); + + TestContentFileParser.assertContentFileEquals( + expected.metadataFile(), actual.metadataFile(), PartitionSpec.unpartitioned()); + + List expectedRows = Lists.newArrayList(expected.rows()); + List actualRows = Lists.newArrayList(actual.rows()); + assertThat(actualRows).hasSameSizeAs(expectedRows); + + // all fields are primitive types or map + Schema schema = expected.schema(); + for (int i = 0; i < expectedRows.size(); ++i) { + StructLike expectedRow = expectedRows.get(i); + StructLike actualRow = actualRows.get(i); + for (int pos = 0; pos < expectedRow.size(); ++pos) { + Class javaClass = schema.columns().get(pos).type().typeId().javaClass(); + assertThat(actualRow.get(pos, javaClass)).isEqualTo(expectedRow.get(pos, javaClass)); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java index 6e274c4811ba..137e7897385b 100644 --- a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java +++ b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java @@ -35,23 +35,64 @@ public void testNullArguments() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid file scan task: null"); - assertThatThrownBy(() -> FileScanTaskParser.fromJson(null, true)) + assertThatThrownBy(() -> FileScanTaskParser.fromJson((String) null, true)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid JSON string for file scan task: null"); + + assertThatThrownBy(() -> ScanTaskParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid scan task: null"); + + assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid JSON string for scan task: null"); } @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testParser(boolean caseSensitive) { + public void testFileScanTaskParser(boolean caseSensitive) { PartitionSpec spec = TestBase.SPEC; - FileScanTask fileScanTask = createScanTask(spec, caseSensitive); + FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive); String jsonStr = FileScanTaskParser.toJson(fileScanTask); - assertThat(jsonStr).isEqualTo(expectedFileScanTaskJson()); + assertThat(jsonStr).isEqualTo(fileScanTaskJsonWithoutTaskType()); FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr, caseSensitive); assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive); } - private FileScanTask createScanTask(PartitionSpec spec, boolean caseSensitive) { + /** Test backward compatibility where task-type field is absent from the JSON string */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFileScanTaskParserWithoutTaskTypeField(boolean caseSensitive) { + PartitionSpec spec = TestBase.SPEC; + FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive); + FileScanTask deserializedTask = + FileScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(), caseSensitive); + assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testScanTaskParser(boolean caseSensitive) { + PartitionSpec spec = TestBase.SPEC; + FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive); + String jsonStr = ScanTaskParser.toJson(fileScanTask); + assertThat(jsonStr).isEqualTo(fileScanTaskJson()); + FileScanTask deserializedTask = ScanTaskParser.fromJson(jsonStr, caseSensitive); + assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive); + } + + /** Test backward compatibility where task-type field is absent from the JSON string */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testScanTaskParserWithoutTaskTypeField(boolean caseSensitive) { + PartitionSpec spec = TestBase.SPEC; + FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive); + FileScanTask deserializedTask = + ScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(), caseSensitive); + assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive); + } + + private FileScanTask createFileScanTask(PartitionSpec spec, boolean caseSensitive) { ResidualEvaluator residualEvaluator; if (spec.isUnpartitioned()) { residualEvaluator = ResidualEvaluator.unpartitioned(Expressions.alwaysTrue()); @@ -67,7 +108,7 @@ private FileScanTask createScanTask(PartitionSpec spec, boolean caseSensitive) { residualEvaluator); } - private String expectedFileScanTaskJson() { + private String fileScanTaskJsonWithoutTaskType() { return "{\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[" + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," @@ -86,6 +127,26 @@ private String expectedFileScanTaskJson() { + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}"; } + private String fileScanTaskJson() { + return "{\"task-type\":\"file-scan-task\"," + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[" + + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," + + "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\"," + + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}," + + "\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0}," + + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}," + + "\"start\":0,\"length\":10," + + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\"," + + "\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\"," + + "\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1}," + + "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\"," + + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10," + + "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}"; + } + private static void assertFileScanTaskEquals( FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean caseSensitive) { TestContentFileParser.assertContentFileEquals(expected.file(), actual.file(), spec); @@ -95,7 +156,7 @@ private static void assertFileScanTaskEquals( expected.deletes().get(pos), actual.deletes().get(pos), spec); } - assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); + assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct()); assertThat(actual.spec()).isEqualTo(expected.spec()); assertThat( ExpressionUtil.equivalent( diff --git a/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java new file mode 100644 index 000000000000..aad87514983a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestScanTaskParser { + @Test + public void nullCheck() { + assertThatThrownBy(() -> ScanTaskParser.toJson(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid scan task: null"); + + assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid JSON string for scan task: null"); + } + + @Test + public void invalidTaskType() { + String jsonStr = "{\"task-type\":\"junk\"}"; + assertThatThrownBy(() -> ScanTaskParser.fromJson(jsonStr, true)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unknown task type: junk"); + } + + @Test + public void unsupportedTask() { + FileScanTask mockTask = Mockito.mock(FileScanTask.class); + assertThatThrownBy(() -> ScanTaskParser.toJson(mockTask)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Unsupported task type: org.apache.iceberg.FileScanTask$MockitoMock$"); + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index 44e37afcfc60..344f64833b62 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -32,7 +32,7 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.FileScanTaskParser; +import org.apache.iceberg.ScanTaskParser; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -154,7 +154,7 @@ private byte[] serialize(int version) throws IOException { out.writeInt(fileScanTasks.size()); for (FileScanTask fileScanTask : fileScanTasks) { - String taskJson = FileScanTaskParser.toJson(fileScanTask); + String taskJson = ScanTaskParser.toJson(fileScanTask); writeTaskJson(out, taskJson, version); } @@ -199,7 +199,7 @@ private static IcebergSourceSplit deserialize( List tasks = Lists.newArrayListWithCapacity(taskCount); for (int i = 0; i < taskCount; ++i) { String taskJson = readTaskJson(in, version); - FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); + FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive); tasks.add(task); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index 44e37afcfc60..344f64833b62 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -32,7 +32,7 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.FileScanTaskParser; +import org.apache.iceberg.ScanTaskParser; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -154,7 +154,7 @@ private byte[] serialize(int version) throws IOException { out.writeInt(fileScanTasks.size()); for (FileScanTask fileScanTask : fileScanTasks) { - String taskJson = FileScanTaskParser.toJson(fileScanTask); + String taskJson = ScanTaskParser.toJson(fileScanTask); writeTaskJson(out, taskJson, version); } @@ -199,7 +199,7 @@ private static IcebergSourceSplit deserialize( List tasks = Lists.newArrayListWithCapacity(taskCount); for (int i = 0; i < taskCount; ++i) { String taskJson = readTaskJson(in, version); - FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); + FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive); tasks.add(task); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index 44e37afcfc60..344f64833b62 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -32,7 +32,7 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.FileScanTaskParser; +import org.apache.iceberg.ScanTaskParser; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -154,7 +154,7 @@ private byte[] serialize(int version) throws IOException { out.writeInt(fileScanTasks.size()); for (FileScanTask fileScanTask : fileScanTasks) { - String taskJson = FileScanTaskParser.toJson(fileScanTask); + String taskJson = ScanTaskParser.toJson(fileScanTask); writeTaskJson(out, taskJson, version); } @@ -199,7 +199,7 @@ private static IcebergSourceSplit deserialize( List tasks = Lists.newArrayListWithCapacity(taskCount); for (int i = 0; i < taskCount; ++i) { String taskJson = readTaskJson(in, version); - FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); + FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive); tasks.add(task); }