Skip to content

Commit

Permalink
Re-add 1.18 and 1.19 changes because they inherit base class changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Jan 23, 2025
1 parent 4823714 commit f4893cc
Show file tree
Hide file tree
Showing 13 changed files with 635 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
// Be sure to check for null values, even if the field is required. Without an explicit null
// check, BinaryRowData will ignore the null flag and parse random bytes as actual values.
// This will produce incorrect writes instead of failing with a NullPointerException.
if (struct.isNullAt(index)) {
return null;
}
return fieldGetter[index].getFieldOrNull(struct);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
writeAndValidate(schema, expectedRecords);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

Expand All @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
for (int i = 0; i < expectedRecords.size(); i++) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < numRecord; i += 1) {
for (int i = 0; i < expectedRecords.size(); i += 1) {
assertThat(records).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Expand Down Expand Up @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException {
1643811742000L,
10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
writeAndValidate(SCHEMA_NUM_TYPE, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Ignore;

public class TestFlinkOrcReaderWriter extends DataTest {
private static final int NUM_RECORDS = 100;

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);
writeAndValidate(schema, expectedRecords);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = File.createTempFile("junit", null, temp.toFile());
Expand Down Expand Up @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException {
assertThat(records).isExhausted();
}
}

@Override
@Ignore("ORC file format supports null values even for required fields")
public void testWriteNullValueForRequiredType() {
super.testWriteNullValueForRequiredType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,9 @@ protected void writeAndValidate(Schema schema) throws IOException {
RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
writeAndValidate(expectedData, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
Expand All @@ -33,10 +36,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkParquetWriter extends DataTest {
Expand Down Expand Up @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException {
schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
List<RowData> binaryRowList = Lists.newArrayList();
for (Record record : expectedData) {
RowData rowData = RowDataConverter.convert(schema, record);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
binaryRowList.add(binaryRow);
}
writeAndValidate(binaryRowList, schema);
}
}
Loading

0 comments on commit f4893cc

Please sign in to comment.