Skip to content

Commit

Permalink
Merge commit '47026a2a3dd41a5c87e44ade58d91a89feba147b' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-june-week-2-2
  • Loading branch information
appletreeisyellow committed Jun 24, 2024
2 parents fceab3c + 47026a2 commit a6d5f8a
Show file tree
Hide file tree
Showing 145 changed files with 4,866 additions and 3,242 deletions.
20 changes: 10 additions & 10 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<()> {
b',',
b'"',
object_store,
Some(b'#'),
);

let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);
Expand Down
44 changes: 38 additions & 6 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
Expand All @@ -32,7 +33,7 @@ use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use datafusion_expr::{AggregateExt, ColumnarValue, ExprSchemable, Operator};

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand All @@ -44,11 +45,12 @@ use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
/// also comes with APIs for evaluation, simplification, and analysis.
///
/// The code in this example shows how to:
/// 1. Create [`Exprs`] using different APIs: [`main`]`
/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
/// 3. Simplify expressions: [`simplify_demo`]
/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
/// 5. Get the types of the expressions: [`expression_type_demo`]
/// 1. Create [`Expr`]s using different APIs: [`main`]`
/// 2. Use the fluent API to easly create complex [`Expr`]s: [`expr_fn_demo`]
/// 3. Evaluate [`Expr`]s against data: [`evaluate_demo`]
/// 4. Simplify expressions: [`simplify_demo`]
/// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`]
/// 6. Get the types of the expressions: [`expression_type_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
Expand All @@ -63,6 +65,9 @@ async fn main() -> Result<()> {
));
assert_eq!(expr, expr2);

// See how to build aggregate functions with the expr_fn API
expr_fn_demo()?;

// See how to evaluate expressions
evaluate_demo()?;

Expand All @@ -78,6 +83,33 @@ async fn main() -> Result<()> {
Ok(())
}

/// Datafusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// full range of expression types such as aggregates and window functions.
fn expr_fn_demo() -> Result<()> {
// Let's say you want to call the "first_value" aggregate function
let first_value = first_value_udaf();

// For example, to create the expression `FIRST_VALUE(price)`
// These expressions can be passed to `DataFrame::aggregate` and other
// APIs that take aggregate expressions.
let agg = first_value.call(vec![col("price")]);
assert_eq!(agg.to_string(), "first_value(price)");

// You can use the AggregateExt trait to create more complex aggregates
// such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts )
let agg = first_value
.call(vec![col("price")])
.order_by(vec![col("ts").sort(false, false)])
.filter(col("quantity").gt(lit(100)))
.build()?; // build the aggregate
assert_eq!(
agg.to_string(),
"first_value(price) FILTER (WHERE quantity > Int32(100)) ORDER BY [ts DESC NULLS LAST]"
);

Ok(())
}

/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
fn evaluate_demo() -> Result<()> {
// For example, let's say you have some integers in an array
Expand Down
20 changes: 8 additions & 12 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::{RequestedStatistics, StatisticsConverter},
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::TableProvider;
Expand Down Expand Up @@ -518,21 +518,17 @@ impl ParquetMetadataIndexBuilder {

// extract the parquet statistics from the file's footer
let metadata = reader.metadata();
let row_groups = metadata.row_groups();

// Extract the min/max values for each row group from the statistics
let row_counts = StatisticsConverter::row_counts(reader.metadata())?;
let value_column_mins = StatisticsConverter::try_new(
let converter = StatisticsConverter::try_new(
"value",
RequestedStatistics::Min,
reader.schema(),
)?
.extract(reader.metadata())?;
let value_column_maxes = StatisticsConverter::try_new(
"value",
RequestedStatistics::Max,
reader.schema(),
)?
.extract(reader.metadata())?;
reader.parquet_schema(),
)?;
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;

// In a real system you would have to handle nulls, which represent
// unknown statistics. All statistics are known in this example
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,7 @@ config_namespace! {
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}

Expand Down
35 changes: 9 additions & 26 deletions datafusion/core/benches/parquet_statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use arrow_schema::{
Field, Schema,
};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use datafusion::datasource::physical_plan::parquet::{
RequestedStatistics, StatisticsConverter,
};
use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;
Expand Down Expand Up @@ -159,41 +157,26 @@ fn criterion_benchmark(c: &mut Criterion) {
let file = file.reopen().unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap();
let metadata = reader.metadata();
let row_groups = metadata.row_groups();

let mut group =
c.benchmark_group(format!("Extract statistics for {}", dtype.clone()));
group.bench_function(
BenchmarkId::new("extract_statistics", dtype.clone()),
|b| {
b.iter(|| {
let _ = StatisticsConverter::try_new(
"col",
RequestedStatistics::Min,
reader.schema(),
)
.unwrap()
.extract(metadata)
.unwrap();

let _ = StatisticsConverter::try_new(
"col",
RequestedStatistics::Max,
reader.schema(),
)
.unwrap()
.extract(reader.metadata())
.unwrap();

let _ = StatisticsConverter::try_new(
let converter = StatisticsConverter::try_new(
"col",
RequestedStatistics::NullCount,
reader.schema(),
reader.parquet_schema(),
)
.unwrap()
.extract(reader.metadata())
.unwrap();

let _ = StatisticsConverter::row_counts(reader.metadata()).unwrap();
let _ = converter.row_group_mins(row_groups.iter()).unwrap();
let _ = converter.row_group_maxes(row_groups.iter()).unwrap();
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap();
let _ = StatisticsConverter::row_group_row_counts(row_groups.iter())
.unwrap();
})
},
);
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ use datafusion_common::{
};
use datafusion_expr::lit;
use datafusion_expr::{
avg, count, max, min, stddev, utils::COUNT_STAR_EXPANSION,
TableProviderFilterPushDown, UNNAMED_TABLE,
avg, count, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::median;
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_functions_aggregate::expr_fn::{median, stddev};

use async_trait::async_trait;

Expand Down Expand Up @@ -1820,7 +1820,7 @@ mod tests {

assert_batches_sorted_eq!(
["+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | SUM(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | sum(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
Expand Down Expand Up @@ -2395,7 +2395,7 @@ mod tests {
assert_batches_sorted_eq!(
[
"+----+-----------------------------+",
"| c1 | SUM(aggregate_test_100.c12) |",
"| c1 | sum(aggregate_test_100.c12) |",
"+----+-----------------------------+",
"| a | 10.238448667882977 |",
"| b | 7.797734760124923 |",
Expand All @@ -2411,7 +2411,7 @@ mod tests {
assert_batches_sorted_eq!(
[
"+----+---------------------+",
"| c1 | SUM(test_table.c12) |",
"| c1 | sum(test_table.c12) |",
"+----+---------------------+",
"| a | 10.238448667882977 |",
"| b | 7.797734760124923 |",
Expand Down
21 changes: 16 additions & 5 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ impl CsvFormat {
self.options.has_header
}

/// Lines beginning with this byte are ignored.
pub fn with_comment(mut self, comment: Option<u8>) -> Self {
self.options.comment = comment;
self
}

/// The character separating values within a row.
/// - default to ','
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
Expand Down Expand Up @@ -252,6 +258,7 @@ impl FileFormat for CsvFormat {
self.options.delimiter,
self.options.quote,
self.options.escape,
self.options.comment,
self.options.compression.into(),
);
Ok(Arc::new(exec))
Expand Down Expand Up @@ -300,7 +307,7 @@ impl CsvFormat {
pin_mut!(stream);

while let Some(chunk) = stream.next().await.transpose()? {
let format = arrow::csv::reader::Format::default()
let mut format = arrow::csv::reader::Format::default()
.with_header(
first_chunk
&& self
Expand All @@ -310,6 +317,10 @@ impl CsvFormat {
)
.with_delimiter(self.options.delimiter);

if let Some(comment) = self.options.comment {
format = format.with_comment(comment);
}

let (Schema { fields, .. }, records_read) =
format.infer_schema(chunk.reader(), Some(records_to_read))?;

Expand Down Expand Up @@ -919,7 +930,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+--------------+",
"| SUM(aggr.c2) |",
"| sum(aggr.c2) |",
"+--------------+",
"| 285 |",
"+--------------+"];
Expand Down Expand Up @@ -956,7 +967,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+--------------+",
"| SUM(aggr.c3) |",
"| sum(aggr.c3) |",
"+--------------+",
"| 781 |",
"+--------------+"];
Expand Down Expand Up @@ -1122,7 +1133,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+---------------------+",
"| SUM(empty.column_1) |",
"| sum(empty.column_1) |",
"+---------------------+",
"| 10 |",
"+---------------------+"];
Expand Down Expand Up @@ -1161,7 +1172,7 @@ mod tests {

#[rustfmt::skip]
let expected = ["+-----------------------+",
"| SUM(one_col.column_1) |",
"| sum(one_col.column_1) |",
"+-----------------------+",
"| 50 |",
"+-----------------------+"];
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,15 @@ mod tests {
ctx.register_json("json_parallel", table_path, options)
.await?;

let query = "SELECT SUM(a) FROM json_parallel;";
let query = "SELECT sum(a) FROM json_parallel;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
"+----------------------+",
"| SUM(json_parallel.a) |",
"| sum(json_parallel.a) |",
"+----------------------+",
"| -7 |",
"+----------------------+"
Expand Down
Loading

0 comments on commit a6d5f8a

Please sign in to comment.