diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ab8963b9c300..5870ccd3a801 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -53,6 +53,7 @@ arrow-ipc = { version = "27.0.0", path = "../arrow-ipc", optional = true } arrow-json = { version = "27.0.0", path = "../arrow-json", optional = true } arrow-schema = { version = "27.0.0", path = "../arrow-schema" } arrow-select = { version = "27.0.0", path = "../arrow-select" } +parquet = { version = "27.0.0", path = "../parquet", features = ["arrow", "async"]} rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.1", default-features = false, features = ["num-traits"] } @@ -65,6 +66,8 @@ comfy-table = { version = "6.0", optional = true, default-features = false } pyo3 = { version = "0.17", default-features = false, optional = true } multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false, optional = true } +tokio = { version = "1.22.0", features = ["full"] } +futures = "0.3.25" [package.metadata.docs.rs] features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"] @@ -120,6 +123,16 @@ name = "read_csv_infer_schema" required-features = ["prettyprint", "csv"] path = "./examples/read_csv_infer_schema.rs" +[[example]] +name = "read_parquet" +required-features = ["prettyprint"] +path = "./examples/read_parquet.rs" + +[[example]] +name = "async_read_parquet" +required-features = ["prettyprint"] +path = "./examples/async_read_parquet.rs" + [[bench]] name = "aggregate_kernels" harness = false diff --git a/arrow/examples/async_read_parquet.rs b/arrow/examples/async_read_parquet.rs new file mode 100644 index 000000000000..06620aa7ac0b --- /dev/null +++ b/arrow/examples/async_read_parquet.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate arrow; + +use arrow::util::pretty::print_batches; +use futures::TryStreamExt; +use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::Result; +use std::time::SystemTime; +use tokio::fs::File; + +#[tokio::main] +async fn main() -> Result<()> { + // Create parquet file that will be read. + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let file = File::open(path).await.unwrap(); + + // Create a async parquet reader builder with batch_size. + // batch_size is the number of rows to read up to buffer once from pages, defaults to 1024 + let mut builder = ParquetRecordBatchStreamBuilder::new(file) + .await + .unwrap() + .with_batch_size(8192); + let file_metadata = builder.metadata().file_metadata(); + + let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2]); + // Set projection mask to read only leaf columns 1 and 2. + builder = builder.with_projection(mask); + + // Highlight: set `RowFilter`, it'll push down filter predicates to skip IO and decode. + // For more specific usage: please refer to https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs. + let predicates: Vec> = Vec::new(); + let row_filter = RowFilter::new(predicates); + builder = builder.with_row_filter(row_filter); + + // Build a async parquet reader. + let stream = builder.build().unwrap(); + + let start = SystemTime::now(); + + let result = stream.try_collect::>().await?; + + println!("took: {} ms", start.elapsed().unwrap().as_millis()); + + print_batches(&result).unwrap(); + + Ok(()) +} diff --git a/arrow/examples/read_parquet.rs b/arrow/examples/read_parquet.rs new file mode 100644 index 000000000000..e754d41f60aa --- /dev/null +++ b/arrow/examples/read_parquet.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::pretty::print_batches; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; +use parquet::errors::Result; +use std::fs::File; + +fn main() -> Result<()> { + // Create parquet file that will be read. + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let file = File::open(path).unwrap(); + + // Create a sync parquet reader with batch_size. + // batch_size is the number of rows to read up to buffer once from pages, defaults to 1024 + let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)? + .with_batch_size(8192) + .build()?; + + let mut batches = Vec::new(); + + for batch in parquet_reader { + batches.push(batch?); + } + + print_batches(&batches).unwrap(); + Ok(()) +}