Skip to content

Commit

Permalink
Add read parquet examples
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Nov 23, 2022
1 parent ed1d74b commit 368920b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
13 changes: 13 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ arrow-ipc = { version = "27.0.0", path = "../arrow-ipc", optional = true }
arrow-json = { version = "27.0.0", path = "../arrow-json", optional = true }
arrow-schema = { version = "27.0.0", path = "../arrow-schema" }
arrow-select = { version = "27.0.0", path = "../arrow-select" }
parquet = { version = "27.0.0", path = "../parquet", features = ["arrow", "async"]}
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
num = { version = "0.4", default-features = false, features = ["std"] }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
Expand All @@ -65,6 +66,8 @@ comfy-table = { version = "6.0", optional = true, default-features = false }
pyo3 = { version = "0.17", default-features = false, optional = true }
multiversion = { version = "0.6.1", default-features = false }
bitflags = { version = "1.2.1", default-features = false, optional = true }
tokio = { version = "1.22.0", features = ["full"] }
futures = "0.3.25"

[package.metadata.docs.rs]
features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"]
Expand Down Expand Up @@ -120,6 +123,16 @@ name = "read_csv_infer_schema"
required-features = ["prettyprint", "csv"]
path = "./examples/read_csv_infer_schema.rs"

[[example]]
name = "read_parquet"
required-features = ["prettyprint"]
path = "./examples/read_parquet.rs"

[[example]]
name = "async_read_parquet"
required-features = ["prettyprint"]
path = "./examples/async_read_parquet.rs"

[[bench]]
name = "aggregate_kernels"
harness = false
Expand Down
65 changes: 65 additions & 0 deletions arrow/examples/async_read_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate arrow;

use arrow::util::pretty::print_batches;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::Result;
use std::time::SystemTime;
use tokio::fs::File;

#[tokio::main]
async fn main() -> Result<()> {
// Create parquet file that will be read.
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(path).await.unwrap();

// Create a async parquet reader builder with batch_size.
// batch_size is the number of rows to read up to buffer once from pages, defaults to 1024
let mut builder = ParquetRecordBatchStreamBuilder::new(file)
.await
.unwrap()
.with_batch_size(8192);
let file_metadata = builder.metadata().file_metadata();

let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2]);
// Set projection mask to read only leaf columns 1 and 2.
builder = builder.with_projection(mask);

// Highlight: set `RowFilter`, it'll push down filter predicates to skip IO and decode.
// For more specific usage: please refer to https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs.
let predicates: Vec<Box<dyn ArrowPredicate>> = Vec::new();
let row_filter = RowFilter::new(predicates);
builder = builder.with_row_filter(row_filter);

// Build a async parquet reader.
let stream = builder.build().unwrap();

let start = SystemTime::now();

let result = stream.try_collect::<Vec<_>>().await?;

println!("took: {} ms", start.elapsed().unwrap().as_millis());

print_batches(&result).unwrap();

Ok(())
}
43 changes: 43 additions & 0 deletions arrow/examples/read_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::util::pretty::print_batches;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::errors::Result;
use std::fs::File;

fn main() -> Result<()> {
// Create parquet file that will be read.
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(path).unwrap();

// Create a sync parquet reader with batch_size.
// batch_size is the number of rows to read up to buffer once from pages, defaults to 1024
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_batch_size(8192)
.build()?;

let mut batches = Vec::new();

for batch in parquet_reader {
batches.push(batch?);
}

print_batches(&batches).unwrap();
Ok(())
}

0 comments on commit 368920b

Please sign in to comment.