Skip to content

Commit

Permalink
Merge branch 'apache:main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead authored Nov 29, 2023
2 parents ef0eee1 + 06bbe12 commit a8157fb
Show file tree
Hide file tree
Showing 66 changed files with 3,758 additions and 3,297 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::AccessLogOpt;
use crate::{BenchmarkRun, CommonOpt};
use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::logical_expr::utils::disjunction;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
)
})?)
},
"quote" | "escape" => {
// https://github.com/apache/arrow-rs/issues/5146
// These two attributes are only available when reading csv files.
// To avoid error
builder
},
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for CSV format!")))
}
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ use crate::datasource::{
physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig},
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown},
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};

Expand All @@ -56,7 +55,6 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,20 @@ impl TableProviderFactory for ListingTableFactory {
let file_extension = get_extension(cmd.location.as_str());

let file_format: Arc<dyn FileFormat> = match file_type {
FileType::CSV => Arc::new(
CsvFormat::default()
FileType::CSV => {
let mut statement_options = StatementOptions::from(&cmd.options);
let mut csv_format = CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8)
.with_file_compression_type(file_compression_type),
),
.with_file_compression_type(file_compression_type);
if let Some(quote) = statement_options.take_str_option("quote") {
csv_format = csv_format.with_quote(quote.as_bytes()[0])
}
if let Some(escape) = statement_options.take_str_option("escape") {
csv_format = csv_format.with_escape(Some(escape.as_bytes()[0]))
}
Arc::new(csv_format)
}
#[cfg(feature = "parquet")]
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::AVRO => Arc::new(AvroFormat),
Expand Down
19 changes: 17 additions & 2 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,24 @@ impl DisplayAs for FileScanConfig {
write!(f, ", infinite_source=true")?;
}

if let Some(ordering) = orderings.first() {
if let Some(ordering) = orderings.get(0) {
if !ordering.is_empty() {
write!(f, ", output_ordering={}", OutputOrderingDisplay(ordering))?;
let start = if orderings.len() == 1 {
", output_ordering="
} else {
", output_orderings=["
};
write!(f, "{}", start)?;
for (idx, ordering) in
orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
{
match idx {
0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
_ => write!(f, ", {}", OutputOrderingDisplay(ordering))?,
}
}
let end = if orderings.len() == 1 { "" } else { "]" };
write!(f, "{}", end)?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
mod statistics;

pub use metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -506,6 +507,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
predicate,
Expand Down Expand Up @@ -718,28 +720,6 @@ pub async fn plan_to_parquet(
Ok(())
}

// Copy from the arrow-rs
// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
// Convert the byte slice to fixed length byte array with the length of 16
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
assert!(b.len() <= 16, "Array too large, expected less than 16");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
*d = *s;
}
result
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(sign_extend_be(b))
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use parquet::{
};
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down
Loading

0 comments on commit a8157fb

Please sign in to comment.