diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f63817f9c..587ddec4d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -92,43 +92,51 @@ impl ArrowReader { let file_io = self.file_io.clone(); Ok(try_stream! { - while let Some(Ok(task)) = tasks.next().await { - // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { - field_ids: HashSet::default(), - }; - if let Some(predicates) = task.predicate() { - visit(&mut collector, predicates)?; - } - - let parquet_file = file_io - .new_input(task.data_file_path())?; - let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - - let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) - .await?; - - let parquet_schema = batch_stream_builder.parquet_schema(); - let arrow_schema = batch_stream_builder.schema(); - let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?; - batch_stream_builder = batch_stream_builder.with_projection(projection_mask); - - let parquet_schema = batch_stream_builder.parquet_schema(); - let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?; - - if let Some(row_filter) = row_filter { - batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); - } - - if let Some(batch_size) = self.batch_size { - batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); - } - - let mut batch_stream = batch_stream_builder.build()?; - - while let Some(batch) = batch_stream.next().await { - yield batch?; + while let Some(task_result) = tasks.next().await { + match task_result { + Ok(task) => { + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; + if let Some(predicates) = task.predicate() { + visit(&mut collector, predicates)?; + } + + let parquet_file = file_io + .new_input(task.data_file_path())?; + + let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) + .await?; + + let parquet_schema = batch_stream_builder.parquet_schema(); + let arrow_schema = batch_stream_builder.schema(); + let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?; + batch_stream_builder = batch_stream_builder.with_projection(projection_mask); + + let parquet_schema = batch_stream_builder.parquet_schema(); + let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?; + + if let Some(row_filter) = row_filter { + batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); + } + + if let Some(batch_size) = self.batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); + } + + let mut batch_stream = batch_stream_builder.build()?; + + while let Some(batch) = batch_stream.next().await { + yield batch?; + } + } + Err(e) => { + Err(e)? + } } } }