Skip to content

Commit

Permalink
Address Eduard's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu committed Mar 15, 2024
1 parent 59113e6 commit 557ff4e
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 24 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws IOEx
generator.writeFieldName(METADATA_FILE);
ContentFileParser.toJson(dataTask.metadataFile(), PartitionSpec.unpartitioned(), generator);

Preconditions.checkArgument(dataTask.tableRows() != null, "Invalid data task: null table rows");
generator.writeArrayFieldStart(ROWS);
for (StructLike row : dataTask.tableRows()) {
SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
Expand All @@ -65,7 +66,7 @@ static StaticDataTask fromJson(JsonNode jsonNode) {
ContentFileParser.fromJson(
JsonUtil.get(METADATA_FILE, jsonNode), PartitionSpec.unpartitioned());

JsonNode rowsArray = jsonNode.get(ROWS);
JsonNode rowsArray = JsonUtil.get(ROWS, jsonNode);
Preconditions.checkArgument(
rowsArray.isArray(), "Invalid JSON node for rows: non-array (%s)", rowsArray);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private FileScanTaskParser() {}
@Deprecated
public static String toJson(FileScanTask fileScanTask) {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
return JsonUtil.generate(generator -> toJson(fileScanTask, generator), false);
return JsonUtil.generate(generator -> toJsonWrapper(fileScanTask, generator), false);
}

/**
Expand All @@ -63,6 +63,13 @@ public static FileScanTask fromJson(String json, boolean caseSensitive) {
return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
}

private static void toJsonWrapper(FileScanTask fileScanTask, JsonGenerator generator)
throws IOException {
generator.writeStartObject();
toJson(fileScanTask, generator);
generator.writeEndObject();
}

static void toJson(FileScanTask fileScanTask, JsonGenerator generator) throws IOException {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
Expand Down
23 changes: 12 additions & 11 deletions core/src/main/java/org/apache/iceberg/ScanTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,32 @@ private enum TaskType {
this.value = value;
}

public static TaskType fromValue(String value) {
Preconditions.checkArgument(value != null, "Invalid task type: null");
if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
public static TaskType fromTypeName(String value) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(value), "Invalid task type name: null or empty");
if (FILE_SCAN_TASK.typeName().equalsIgnoreCase(value)) {
return FILE_SCAN_TASK;
} else if (DATA_TASK.value().equalsIgnoreCase(value)) {
} else if (DATA_TASK.typeName().equalsIgnoreCase(value)) {
return DATA_TASK;
} else {
throw new IllegalArgumentException("Unknown task type: " + value);
}
}

public String value() {
public String typeName() {
return value;
}
}

private ScanTaskParser() {}

public static String toJson(FileScanTask fileScanTask) {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
Preconditions.checkArgument(fileScanTask != null, "Invalid scan task: null");
return JsonUtil.generate(generator -> toJson(fileScanTask, generator), false);
}

public static FileScanTask fromJson(String json, boolean caseSensitive) {
Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null");
Preconditions.checkArgument(json != null, "Invalid JSON string for scan task: null");
return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
}

Expand All @@ -71,11 +72,11 @@ private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
generator.writeStartObject();

if (fileScanTask instanceof StaticDataTask) {
generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.value());
generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName());
DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
} else if (fileScanTask instanceof BaseFileScanTask
|| fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.typeName());
FileScanTaskParser.toJson(fileScanTask, generator);
} else {
throw new UnsupportedOperationException(
Expand All @@ -89,7 +90,7 @@ private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
TaskType taskType = TaskType.FILE_SCAN_TASK;
String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
if (null != taskTypeStr) {
taskType = TaskType.fromValue(taskTypeStr);
taskType = TaskType.fromTypeName(taskTypeStr);
}

switch (taskType) {
Expand All @@ -98,7 +99,7 @@ private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
case DATA_TASK:
return DataTaskParser.fromJson(jsonNode);
default:
throw new UnsupportedOperationException("Unsupported task type: " + taskType.value());
throw new UnsupportedOperationException("Unsupported task type: " + taskType.typeName());
}
}
}
116 changes: 109 additions & 7 deletions core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,119 @@
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestDataTaskParser {
@Test
public void testStaticDataTask() {
public void nullCheck() throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);

Assertions.assertThatThrownBy(() -> DataTaskParser.toJson(null, generator))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid data task: null");

Assertions.assertThatThrownBy(
() -> DataTaskParser.toJson((StaticDataTask) createDataTask(), null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON generator: null");

Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON node for data task: null");
}

@Test
public void invalidJsonNode() throws Exception {
String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.reader().readTree(jsonStr);

Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Invalid JSON node for data task: non-object ");

Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Invalid JSON node for data task: non-object ");
}

@Test
public void missingFields() throws Exception {
ObjectMapper mapper = new ObjectMapper();

String missingSchemaStr = "{}";
JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr);
Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot parse missing field: schema");

String missingProjectionStr =
"{"
+ "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+ "}";
JsonNode missingProjectionNode = mapper.reader().readTree(missingProjectionStr);
Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot parse missing field: projection");

String missingMetadataFileStr =
"{"
+ "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]},"
+ "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+ "}";
JsonNode missingMetadataFileNode = mapper.reader().readTree(missingMetadataFileStr);
Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot parse missing field: metadata-file");

String missingTableRowsStr =
"{\"task-type\":\"data-task\","
+ "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ "\"value\":\"string\",\"value-required\":true}}]},"
+ "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ "{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ "{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ "{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ "{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ "\"value\":\"string\",\"value-required\":true}}]},"
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+ "\"file-path\":\"/tmp/metadata2.json\","
+ "\"file-format\":\"METADATA\",\"partition\":{},"
+ "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}"
+ "}";
JsonNode missingTableRowsNode = mapper.reader().readTree(missingTableRowsStr);
Assertions.assertThatThrownBy(() -> DataTaskParser.fromJson(missingTableRowsNode))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot parse missing field: rows");
}

@Test
public void roundTripSerde() {
StaticDataTask dataTask = (StaticDataTask) createDataTask();
String jsonStr = ScanTaskParser.toJson(dataTask);
Assertions.assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
Expand Down Expand Up @@ -117,13 +219,13 @@ private String snapshotsDataTaskJson() {
}

private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask actual) {
assertThat(expected.schema().asStruct()).isEqualTo(atual.schema().asStruct()))
.as("Schema should match")
.isTrue();
Assertions.assertThat(expected.schema().asStruct())
.isEqualTo(actual.schema().asStruct())
.as("Schema should match");

Assertions.assertThat(expected.projectedSchema().sameSchema(actual.projectedSchema()))
.as("Projected schema should match")
.isTrue();
Assertions.assertThat(expected.projectedSchema().asStruct())
.isEqualTo(actual.projectedSchema().asStruct())
.as("Projected schema should match");

TestContentFileParser.assertContentFileEquals(
expected.metadataFile(), actual.metadataFile(), PartitionSpec.unpartitioned());
Expand Down
37 changes: 33 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,50 @@
import org.junit.jupiter.params.provider.ValueSource;

public class TestFileScanTaskParser {

@Test
public void testNullArguments() {
Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))
Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid file scan task: null");

Assertions.assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
Assertions.assertThatThrownBy(() -> FileScanTaskParser.fromJson((String) null, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON string for file scan task: null");

Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid scan task: null");

Assertions.assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON string for scan task: null");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFileScanTaskParser(boolean caseSensitive) {
PartitionSpec spec = TableTestBase.SPEC;
FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
String jsonStr = FileScanTaskParser.toJson(fileScanTask);
Assertions.assertThat(jsonStr).isEqualTo(fileScanTaskJsonWithoutTaskType());
FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr, caseSensitive);
assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive);
}

/** Test backward compatibility where task-type field is absent from the JSON string */
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFileScanTaskParserWithoutTaskTypeField(boolean caseSensitive) {
PartitionSpec spec = TableTestBase.SPEC;
FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
FileScanTask deserializedTask =
FileScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(), caseSensitive);
assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, caseSensitive);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScanTaskParser(boolean caseSensitive) {
PartitionSpec spec = TableTestBase.SPEC;
FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
String jsonStr = ScanTaskParser.toJson(fileScanTask);
Expand All @@ -53,7 +82,7 @@ public void testFileScanTaskParser(boolean caseSensitive) {
/** Test backward compatibility where task-type field is absent from the JSON string */
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFileScanTaskParserWithoutTaskTypeField(boolean caseSensitive) {
public void testScanTaskParserWithoutTaskTypeField(boolean caseSensitive) {
PartitionSpec spec = TableTestBase.SPEC;
FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
FileScanTask deserializedTask =
Expand Down
43 changes: 43 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestScanTaskParser {
@Test
public void nullCheck() {
Assertions.assertThatThrownBy(() -> ScanTaskParser.toJson(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid scan task: null");

Assertions.assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON string for scan task: null");
}

@Test
public void invalidTaskType() {
String jsonStr = "{\"task-type\":\"junk\"}";
Assertions.assertThatThrownBy(() -> ScanTaskParser.fromJson(jsonStr, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unknown task type: junk");
}
}

0 comments on commit 557ff4e

Please sign in to comment.