Skip to content

Commit

Permalink
Spec: add task-type field to JSON serialization of scan task. add JSO…
Browse files Browse the repository at this point in the history
…N serialization for StaticDataTask.
  • Loading branch information
stevenzwu committed Feb 20, 2024
1 parent 50fb400 commit c27b5d5
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 38 deletions.
80 changes: 80 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTaskParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.JsonUtil;

class DataTaskParser {
private static final String SCHEMA = "schema";
private static final String PROJECTED_SCHEMA = "projection";
private static final String METADATA_FILE = "metadata-file";
private static final String ROWS = "rows";

private DataTaskParser() {}

static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws IOException {
Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");

generator.writeFieldName(SCHEMA);
SchemaParser.toJson(dataTask.schema(), generator);

generator.writeFieldName(PROJECTED_SCHEMA);
SchemaParser.toJson(dataTask.projectedSchema(), generator);

generator.writeFieldName(METADATA_FILE);
ContentFileParser.toJson(dataTask.metadataFile(), PartitionSpec.unpartitioned(), generator);

generator.writeArrayFieldStart(ROWS);
for (StructLike row : dataTask.tableRows()) {
SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
}

generator.writeEndArray();
}

static StaticDataTask fromJson(JsonNode jsonNode) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data task: null");
Preconditions.checkArgument(
jsonNode.isObject(), "Invalid JSON node for data task: non-object (%s)", jsonNode);

Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
Schema projectedSchema = SchemaParser.fromJson(JsonUtil.get(PROJECTED_SCHEMA, jsonNode));
DataFile metadataFile =
(DataFile)
ContentFileParser.fromJson(
JsonUtil.get(METADATA_FILE, jsonNode), PartitionSpec.unpartitioned());

JsonNode rowsArray = jsonNode.get(ROWS);
Preconditions.checkArgument(
rowsArray.isArray(), "Invalid JSON node for rows: non-array (%s)", rowsArray);

StructLike[] rows = new StructLike[rowsArray.size()];
for (int i = 0; i < rowsArray.size(); ++i) {
JsonNode rowNode = rowsArray.get(i);
rows[i] = (StructLike) SingleValueParser.fromJson(schema.asStruct(), rowNode);
}

return new StaticDataTask(metadataFile, schema, projectedSchema, rows);
}
}
35 changes: 22 additions & 13 deletions core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,32 @@ public class FileScanTaskParser {

private FileScanTaskParser() {}

/**
* Serialize file scan task to JSON string
*
* @deprecated Use {@link ScanTaskParser#toJson(FileScanTask)} instead; will be removed in 1.7.0
*/
@Deprecated
public static String toJson(FileScanTask fileScanTask) {
return JsonUtil.generate(
generator -> FileScanTaskParser.toJson(fileScanTask, generator), false);
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
return JsonUtil.generate(generator -> toJson(fileScanTask, generator), false);
}

/**
* Deserialize file scan task from JSON string
*
* @deprecated Use {@link ScanTaskParser#fromJson(String, boolean)} instead; will be removed in
* 1.7.0
*/
@Deprecated
public static FileScanTask fromJson(String json, boolean caseSensitive) {
Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null");
return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
}

private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
throws IOException {
static void toJson(FileScanTask fileScanTask, JsonGenerator generator) throws IOException {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
generator.writeStartObject();

generator.writeFieldName(SCHEMA);
SchemaParser.toJson(fileScanTask.schema(), generator);
Expand Down Expand Up @@ -78,16 +94,9 @@ private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
generator.writeFieldName(RESIDUAL);
ExpressionParser.toJson(fileScanTask.residual(), generator);
}

generator.writeEndObject();
}

public static FileScanTask fromJson(String json, boolean caseSensitive) {
Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null");
return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node, caseSensitive));
}

private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file scan task: null");
Preconditions.checkArgument(
jsonNode.isObject(), "Invalid JSON node for file scan task: non-object (%s)", jsonNode);
Expand Down
104 changes: 104 additions & 0 deletions core/src/main/java/org/apache/iceberg/ScanTaskParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.util.JsonUtil;

public class ScanTaskParser {
private static final String TASK_TYPE = "task-type";

private enum TaskType {
FILE_SCAN_TASK("file-scan-task"),
DATA_TASK("data-task");

private final String value;

TaskType(String value) {
this.value = value;
}

public static TaskType fromValue(String value) {
Preconditions.checkArgument(value != null, "Task type is null");
if (FILE_SCAN_TASK.value().equalsIgnoreCase(value)) {
return FILE_SCAN_TASK;
} else if (DATA_TASK.value().equalsIgnoreCase(value)) {
return DATA_TASK;
} else {
throw new IllegalArgumentException("Unknown task type: " + value);
}
}

public String value() {
return value;
}
}

private ScanTaskParser() {}

public static String toJson(FileScanTask fileScanTask) {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: null");
return JsonUtil.generate(generator -> toJson(fileScanTask, generator), false);
}

public static FileScanTask fromJson(String json, boolean caseSensitive) {
Preconditions.checkArgument(json != null, "Invalid JSON string for file scan task: null");
return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
}

private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
throws IOException {
generator.writeStartObject();

if (fileScanTask instanceof StaticDataTask) {
generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.value());
DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
} else if (fileScanTask instanceof BaseFileScanTask
|| fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.value());
FileScanTaskParser.toJson(fileScanTask, generator);
} else {
throw new UnsupportedOperationException(
"Unsupported task type: " + fileScanTask.getClass().getCanonicalName());
}

generator.writeEndObject();
}

private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
TaskType taskType = TaskType.FILE_SCAN_TASK;
String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
if (!Strings.isNullOrEmpty(taskTypeStr)) {
taskType = TaskType.fromValue(taskTypeStr);
}

switch (taskType) {
case FILE_SCAN_TASK:
return FileScanTaskParser.fromJson(jsonNode, caseSensitive);
case DATA_TASK:
return DataTaskParser.fromJson(jsonNode);
default:
throw new UnsupportedOperationException("Unsupported task type: " + taskType.value());
}
}
}
7 changes: 5 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Types;

/**
Expand All @@ -27,7 +28,8 @@
* <p>This does not include snapshots that have been expired using {@link ExpireSnapshots}.
*/
public class SnapshotsTable extends BaseMetadataTable {
private static final Schema SNAPSHOT_SCHEMA =
@VisibleForTesting
static final Schema SNAPSHOT_SCHEMA =
new Schema(
Types.NestedField.required(1, "committed_at", Types.TimestampType.withZone()),
Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
Expand Down Expand Up @@ -94,7 +96,8 @@ public CloseableIterable<FileScanTask> planFiles() {
}
}

private static StaticDataTask.Row snapshotToRow(Snapshot snap) {
@VisibleForTesting
static StaticDataTask.Row snapshotToRow(Snapshot snap) {
return StaticDataTask.Row.of(
snap.timestampMillis() * 1000,
snap.snapshotId(),
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/StaticDataTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ private StaticDataTask(
this.rows = rows;
}

StaticDataTask(
DataFile metadataFile, Schema tableSchema, Schema projectedSchema, StructLike[] rows) {
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.metadataFile = metadataFile;
this.rows = rows;
}

@Override
public Schema schema() {
return tableSchema;
}

@Override
public List<DeleteFile> deletes() {
return ImmutableList.of();
Expand Down Expand Up @@ -106,6 +119,19 @@ public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this);
}

Schema projectedSchema() {
return projectedSchema;
}

DataFile metadataFile() {
return metadataFile;
}

/** @return the full table row before projection */
StructLike[] tableRows() {
return rows;
}

/** Implements {@link StructLike#get} for passing static rows. */
static class Row implements StructLike, Serializable {
public static Row of(Object... values) {
Expand Down
Loading

0 comments on commit c27b5d5

Please sign in to comment.