Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build: Bump Apache Parquet 1.14.4 #11502

Merged
merged 6 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
return table;
}

private void createNestedTable() throws IOException {
private Table createNestedTable() throws IOException {
Table table =
validationCatalog.createTable(
TableIdentifier.of(DATABASE, TABLE_NAME),
Expand All @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();

return table;
}

@BeforeEach
Expand All @@ -168,7 +171,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down Expand Up @@ -212,32 +215,88 @@ protected Object[] row(Object... values) {

@TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
Table table = createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// With new releases of Parquet, new features might be added which cause the
// size of the column to increase. For example, with Parquet 1.14.x the
// uncompressed size has been added to allow for better allocation of memory upfront.
// Therefore, we look the sizes up, rather than hardcoding them
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that we're assuming only a single file, so we might as well use Iterables.getOnlyElement(table.currentSnapshot().addedDataFiles(table.io()))

Map<Integer, Long> columnSizeStats = dataFile.columnSizes();

Row binaryCol =
Row.of(
52L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()),
4L,
0L,
null,
false,
true);
Row decimalCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()),
4L,
1L,
null,
new BigDecimal("1.00"),
new BigDecimal("2.00"));
Row doubleCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()),
4L,
0L,
1L,
1.0D,
2.0D);
Row fixedCol =
Row.of(
44L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");
Row floatCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()),
4L,
0L,
2L,
0f,
0f);
Row intCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()),
4L,
0L,
null,
1,
2);
Row longCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()),
4L,
0L,
null,
1L,
2L);
Row stringCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()),
4L,
0L,
null,
"1",
"2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception {
@TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// We have to take a slightly different approach, since we don't store
// the column sizes for nested fields.
long leafDoubleColSize =
(long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0);
long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0);

Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics));
TestHelpers.assertRows(result, ImmutableList.of(metrics));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
return table;
}

private void createNestedTable() throws IOException {
private Table createNestedTable() throws IOException {
Table table =
validationCatalog.createTable(
TableIdentifier.of(DATABASE, TABLE_NAME),
Expand All @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();

return table;
}

@BeforeEach
Expand Down Expand Up @@ -212,32 +215,88 @@ protected Object[] row(Object... values) {

@TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
Table table = createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// With new releases of Parquet, new features might be added which cause the
// size of the column to increase. For example, with Parquet 1.14.x the
// uncompressed size has been added to allow for better allocation of memory upfront.
// Therefore, we look the sizes up, rather than hardcoding them
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
Map<Integer, Long> columnSizeStats = dataFile.columnSizes();

Row binaryCol =
Row.of(
52L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()),
4L,
0L,
null,
false,
true);
Row decimalCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()),
4L,
1L,
null,
new BigDecimal("1.00"),
new BigDecimal("2.00"));
Row doubleCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()),
4L,
0L,
1L,
1.0D,
2.0D);
Row fixedCol =
Row.of(
44L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");
Row floatCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()),
4L,
0L,
2L,
0f,
0f);
Row intCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()),
4L,
0L,
null,
1,
2);
Row longCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()),
4L,
0L,
null,
1L,
2L);
Row stringCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()),
4L,
0L,
null,
"1",
"2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception {
@TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// We have to take a slightly different approach, since we don't store
// the column sizes for nested fields.
long leafDoubleColSize =
(long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0);
long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0);

Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics));
TestHelpers.assertRows(result, ImmutableList.of(metrics));
}
}
Loading