Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Nov 19, 2024
1 parent b5a4d4d commit f38b896
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.io.IOException;
import java.util.List;

import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -263,7 +262,6 @@ private static RecordReader createRecordReader(
boolean deletionVectorsEnabled)
throws IOException {
org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult);
checkStructCompatible(schema, orcReader.getSchema());
try {
// get offset and length for the stripes that start in the split
Pair<Long, Long> offsetAndLength =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import org.apache.paimon.shade.guava30.com.google.common.base.Objects;

import org.apache.orc.TypeDescription;

import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util for orc types. */
public class OrcTypeUtil {

Expand Down Expand Up @@ -150,59 +144,4 @@ static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}

public static void checkStructCompatible(
TypeDescription requiredStruct, TypeDescription orcStruct) {
List<String> requiredFields = requiredStruct.getFieldNames();
List<TypeDescription> requiredTypes = requiredStruct.getChildren();
List<String> orcFields = orcStruct.getFieldNames();
List<TypeDescription> orcTypes = orcStruct.getChildren();

for (int i = 0; i < requiredFields.size(); i++) {
String field = requiredFields.get(i);
int orcIndex = orcFields.indexOf(field);
checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field);
TypeDescription requiredType = requiredTypes.get(i);
TypeDescription orcType = orcTypes.get(orcIndex);
checkField(field, requiredType, orcType);
}
}

private static void checkField(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
checkFieldIdAttribute(fieldName, requiredType, orcType);
if (requiredType.getCategory().isPrimitive()) {
return;
}

// see TypeDescription#getPartialName
switch (requiredType.getCategory()) {
case LIST:
checkField(
"_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0));
return;
case MAP:
checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0));
checkField(
"_value", requiredType.getChildren().get(1), orcType.getChildren().get(1));
return;
case STRUCT:
checkStructCompatible(requiredType, orcType);
return;
default:
throw new UnsupportedOperationException("Unsupported orc type: " + requiredType);
}
}

private static void checkFieldIdAttribute(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
checkArgument(
Objects.equal(requiredId, orcId),
"Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.",
fieldName,
requiredId,
orcId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,35 @@

package org.apache.paimon.format.orc;

import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.paimon.shade.guava30.com.google.common.base.Objects;

import org.apache.hadoop.conf.Configuration;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.format.orc.OrcTypeUtil.checkStructCompatible;
import static org.apache.paimon.format.orc.OrcFileFormat.refineDataType;
import static org.apache.paimon.format.orc.OrcTypeUtil.PAIMON_ORC_FIELD_ID_KEY;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;

Expand Down Expand Up @@ -73,24 +90,117 @@ private void test(String expected, DataType type) {
}

@Test
void testCheckFieldIdAttribute() {
RowType full =
void testFieldIdAttribute(@TempDir java.nio.file.Path tempPath) throws IOException {
RowType rowType =
RowType.builder()
.field("a", DataTypes.INT())
.field(
"b",
RowType.builder(true, new AtomicInteger(5))
RowType.builder(true, new AtomicInteger(10))
.field("f0", DataTypes.STRING())
.field("f1", DataTypes.INT())
.build())
.field("c", DataTypes.ARRAY(DataTypes.INT()))
.field("d", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))
.field(
"e",
DataTypes.ARRAY(
RowType.builder(true, new AtomicInteger(20))
.field("f0", DataTypes.STRING())
.field("f1", DataTypes.INT())
.build()))
.field(
"f",
RowType.builder(true, new AtomicInteger(30))
.field("f0", DataTypes.ARRAY(DataTypes.INT()))
.build())
.build();
RowType projected = full.project("c", "b", "d");

TypeDescription required = convertToOrcSchema(projected);
TypeDescription orc = convertToOrcSchema(full);
// write schema to orc file then get
FileIO fileIO = LocalFileIO.create();
Path tempFile = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString());

OrcFileFormat format =
new OrcFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
PositionOutputStream out = fileIO.newOutputStream(tempFile, false);
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
writer.close();
out.close();

Reader orcReader =
OrcReaderFactory.createReader(new Configuration(), fileIO, tempFile, null);
TypeDescription orcSchema = orcReader.getSchema();

RowType refined = (RowType) refineDataType(rowType);

assertThatNoException()
.isThrownBy(() -> checkStruct(convertToOrcSchema(refined), orcSchema));

assertThatNoException()
.isThrownBy(
() ->
checkStruct(
convertToOrcSchema(refined.project("c", "b", "d")),
orcSchema));

assertThatNoException()
.isThrownBy(
() ->
checkStruct(
convertToOrcSchema(refined.project("a", "e", "f")),
orcSchema));
}

private void checkStruct(TypeDescription requiredStruct, TypeDescription orcStruct) {
List<String> requiredFields = requiredStruct.getFieldNames();
List<TypeDescription> requiredTypes = requiredStruct.getChildren();
List<String> orcFields = orcStruct.getFieldNames();
List<TypeDescription> orcTypes = orcStruct.getChildren();

for (int i = 0; i < requiredFields.size(); i++) {
String field = requiredFields.get(i);
int orcIndex = orcFields.indexOf(field);
checkArgument(orcIndex != -1, "Cannot find field %s in orc file meta.", field);
TypeDescription requiredType = requiredTypes.get(i);
TypeDescription orcType = orcTypes.get(orcIndex);
checkField(field, requiredType, orcType);
}
}

private void checkField(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
checkFieldIdAttribute(fieldName, requiredType, orcType);
if (requiredType.getCategory().isPrimitive()) {
return;
}

switch (requiredType.getCategory()) {
case LIST:
checkField(
"_elem", requiredType.getChildren().get(0), orcType.getChildren().get(0));
return;
case MAP:
checkField("_key", requiredType.getChildren().get(0), orcType.getChildren().get(0));
checkField(
"_value", requiredType.getChildren().get(1), orcType.getChildren().get(1));
return;
case STRUCT:
checkStruct(requiredType, orcType);
return;
default:
throw new UnsupportedOperationException("Unsupported orc type: " + requiredType);
}
}

assertThatNoException().isThrownBy(() -> checkStructCompatible(required, orc));
private void checkFieldIdAttribute(
String fieldName, TypeDescription requiredType, TypeDescription orcType) {
String requiredId = requiredType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
String orcId = orcType.getAttributeValue(PAIMON_ORC_FIELD_ID_KEY);
checkArgument(
Objects.equal(requiredId, orcId),
"Field %s has different id: read type id is %s but orc type id is %s. This is unexpected.",
fieldName,
requiredId,
orcId);
}
}

0 comments on commit f38b896

Please sign in to comment.