Skip to content

Commit

Permalink
Merge pull request #1 from andygrove/df-parquet-exec
Browse files Browse the repository at this point in the history
add partial support for multiple parquet files
  • Loading branch information
mbutrovich authored Nov 9, 2024
2 parents fb68558 + 2027755 commit ef9f8f5
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
15 changes: 11 additions & 4 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,11 @@ impl PhysicalPlanner {
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let path = Url::parse(&scan.path).unwrap();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
Expand All @@ -1021,11 +1025,14 @@ impl PhysicalPlanner {
.runtime_env()
.register_object_store(&url, Arc::new(object_store));

let file = PartitionedFile::from_path(path.path().to_string())?;
let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();

let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file(file)
FileScanConfig::new(object_store_url, data_schema_arrow.clone())
.with_file_groups(vec![files])
.with_projection(Some(projection_vector));

let mut table_parquet_options = TableParquetOptions::new();
Expand Down
18 changes: 11 additions & 7 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

//! Define JNI APIs which can be called from Java/Scala.
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
Expand All @@ -39,8 +41,6 @@ use jni::{
};
use std::{collections::HashMap, sync::Arc, task::Poll};

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};

use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
Expand Down Expand Up @@ -370,11 +370,15 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
}

let task_ctx = exec_context.session_ctx.task_ctx();
let stream = exec_context
.root_op
.as_ref()
.unwrap()
.execute(0, task_ctx)?;

let plan = exec_context.root_op.as_ref().unwrap();

println!(
"Executing partition 0 of {}",
plan.output_partitioning().partition_count()
);

let stream = plan.execute(0, task_ctx)?;
exec_context.stream = Some(stream);
} else {
// Pull input batches
Expand Down
2 changes: 1 addition & 1 deletion native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
string path = 3;
repeated string path = 3;
string required_schema = 4;
string data_schema = 5;
repeated spark.spark_expression.Expr data_filters = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3188,9 +3188,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

scanBuilder.setRequiredSchema(requiredSchemaParquet.toString)
scanBuilder.setDataSchema(dataSchemaParquet.toString)
scanBuilder.setPath(cometScan.relation.location.inputFiles(0))

// TODO this is not correct yet .. we really need to know how many partitions Spark is expecting
// and then split the files / row groups up across that number of partitions
cometScan.relation.location.inputFiles.foreach { f =>
scanBuilder.addPath(f)
}
case _ =>
scanBuilder.setPath("")
// scanBuilder.setPath("")
}

Some(result.setScan(scanBuilder).build())
Expand Down

0 comments on commit ef9f8f5

Please sign in to comment.