From e0d2f57fc9ea6693a25cb7bd7ebe561f918a3ba8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 9 Oct 2024 18:10:30 +0800 Subject: [PATCH] fix(arrow): Use new ParquetMetaDataReader instead Signed-off-by: Xuanwo --- crates/iceberg/src/arrow/reader.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ed422be99..f6680e312 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -31,11 +31,11 @@ use bytes::Bytes; use fnv::FnvHashSet; use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; -use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{try_join, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; -use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; @@ -1078,12 +1078,14 @@ impl AsyncFileReader for ArrowFileReader { } fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { - let file_size = self.meta.size; - let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; - loader.load_page_index(false, false).await?; - Ok(Arc::new(loader.finish())) - }) + async move { + let reader = ParquetMetaDataReader::new(); + let size = self.meta.size as usize; + let meta = reader.load_and_finish(self, size).await?; + + Ok(Arc::new(meta)) + } + .boxed() } }