-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-3031: Support to transfer input stream when building ParquetFileReader #3030
Conversation
c251e53
to
8cc2f6d
Compare
8cc2f6d
to
957d184
Compare
7a35780
to
268149b
Compare
On many file systems, a seek backwards to read the data after reading the footer results in slower reads because the fs switches from a sequential read to a random read (which typically turns off pre-fetching and other optimizations enabled in sequential reads). |
Thanks parthchandra for the comments. For our company internal managed spark, we reuse the inputstream for parquet file. Before that: A spark task will open the file multiple times to read footer and data. When the HDFS nameNode is under high pressure, it will cost time. After that, it only open the parquet file for one time. |
This is the testing 3years ago on Spark-2.3. ![]() And after this Spark patch in community [https://github.com/apache/spark/pull/39950]([SPARK-42388][SQL] Avoid parquet footer reads twice in vectorized reader), the solution might reduce 1/2 hdfs RPC requests. |
It looks reasonable to me and users can choose their best fit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. This can significantly reduce the number of NameNode RPCs to improve performance.
Benchmark:
select count(*) from tbl where id > -999;
The No. of RPCs with vanilla Parquet and Spark | The No. of RPCs with this PR and apply a Spark patch |
---|---|
179997 | 60007 |
Spark side code:
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index fee80e97bd2..8e0c68cc71a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -28,6 +28,10 @@
import java.util.Map;
import java.util.Set;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
import com.google.common.annotations.VisibleForTesting;
@@ -60,6 +64,11 @@
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
* This class handles computing row groups, filtering on them, setting up the column readers,
@@ -69,6 +78,8 @@
* this way, albeit at a higher cost to implement. This base class is reusable.
*/
public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Void, T> {
+ private static final Logger LOG = LoggerFactory.getLogger(SpecificParquetRecordReaderBase.class);
+
protected Path file;
protected MessageType fileSchema;
protected MessageType requestedSchema;
@@ -87,6 +98,8 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected ParquetRowGroupReader reader;
protected OpenedParquetFileInfo openedFileInfo;
+ // indicate whether the input can be skip or not
+ protected boolean canSkip = false;
public SpecificParquetRecordReaderBase(OpenedParquetFileInfo openedFileInfo) {
this.openedFileInfo = openedFileInfo;
@@ -106,16 +119,42 @@ public void initialize(
FileSplit split = (FileSplit) inputSplit;
this.file = split.getPath();
ParquetFileReader fileReader;
- if (fileFooter.isDefined()) {
- fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
+ ParquetMetadata footer;
+ List<BlockMetaData> blocks;
+
+ if (openedFileInfo != null && openedFileInfo.getFileFooter() != null) {
+ footer = openedFileInfo.getFileFooter();
} else {
- ParquetReadOptions options = HadoopReadOptions
- .builder(configuration, file)
- .withRange(split.getStart(), split.getStart() + split.getLength())
- .build();
+ LOG.info("Parquet read footer...");
+ footer = readFooter(configuration, file, range(split.getStart(),
+ split.getStart() + split.getLength()));
+ }
+
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
+ FilterCompat.Filter filter = getFilter(configuration);
+ blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+
+ // can skip this file, so the following initiate actions are not required
+ if (blocks.isEmpty()) {
+ canSkip = true;
+ return;
+ }
+
+ Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
+ ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
+ ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+ taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
+ this.requestedSchema = readContext.getRequestedSchema();
+
+ if (openedFileInfo != null && openedFileInfo.getOpenedStream() != null) {
fileReader = new ParquetFileReader(
- HadoopInputFile.fromPath(file, configuration), options);
+ configuration, footer.getFileMetaData(), openedFileInfo.getOpenedStream(),
+ openedFileInfo.getInputFile(), blocks, requestedSchema.getColumns());
+ } else {
+ fileReader = new ParquetFileReader(
+ configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
}
+
this.reader = new ParquetRowGroupReaderImpl(fileReader);
this.fileSchema = fileReader.getFileMetaData().getSchema();
try {
@@ -124,11 +163,7 @@ public void initialize(
// Swallow any exception, if we cannot parse the version we will revert to a sequential read
// if the column is a delta byte array encoding (due to PARQUET-246).
}
- Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
- ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
- ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
- taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
- this.requestedSchema = readContext.getRequestedSchema();
+
fileReader.setRequestedSchema(requestedSchema);
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 14995510489..0ae08d5490d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -197,6 +197,9 @@ public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
+ if (canSkip) {
+ return;
+ }
initializeInternal();
}
@@ -207,6 +210,9 @@ public void initialize(
Option<ParquetMetadata> fileFooter)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext, fileFooter);
+ if (canSkip) {
+ return;
+ }
initializeInternal();
}
@@ -218,6 +224,9 @@ public void initialize(
public void initialize(String path, List<String> columns) throws IOException,
UnsupportedOperationException {
super.initialize(path, columns);
+ if (canSkip) {
+ return;
+ }
initializeInternal();
}
@@ -229,6 +238,9 @@ public void initialize(
ParquetRowGroupReader rowGroupReader,
int totalRowCount) throws IOException {
super.initialize(fileSchema, requestedSchema, rowGroupReader, totalRowCount);
+ if (canSkip) {
+ return;
+ }
initializeInternal();
}
@@ -243,6 +255,9 @@ public void close() throws IOException {
@Override
public boolean nextKeyValue() throws IOException {
+ if (canSkip) {
+ return false;
+ }
resultBatch();
if (returnColumnarBatch) return nextBatch();
@@ -318,6 +333,9 @@ private void initBatch() {
}
public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
+ if (canSkip) {
+ return;
+ }
initBatch(MEMORY_MODE, partitionColumns, partitionValues);
}
@@ -342,6 +360,9 @@ public void enableReturningBatches() {
* Advances to the next batch of rows. Returns false if there are no more.
*/
public boolean nextBatch() throws IOException {
+ if (canSkip) {
+ return false;
+ }
for (ParquetColumnVector vector : columnVectors) {
vector.reset();
}
@wgtmac @gszadovszky Could we merge this PR? |
Thank you all. |
### What changes were proposed in this pull request? Bumps to the latest version of Parquet. For the full list of changes, please check the pre-release: https://github.com/apache/parquet-java/releases/tag/apache-parquet-1.15.0 Including some interesting patches for Spark, such as apache/parquet-java#3030 ### Why are the changes needed? To bring the latest features and bug fixes for Apache Spark 4.0.0. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48970 from Fokko/fd-parquet-1-15-0. Authored-by: Fokko <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Rationale for this change
Support to transfer the parquet file inputstream when building the ParquetFileReader, so that we can re-use the existing inputstream and reduce the open file rpcs.
What changes are included in this PR?
As title.
Are these changes tested?
Existing UT. It only a new constructors.
Are there any user-facing changes?
No break change.
Closes #3031