Skip to content

Commit

Permalink
Bloom filter Join Step I: create benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Aug 11, 2024
1 parent 2730423 commit 02b1a4a
Show file tree
Hide file tree
Showing 2 changed files with 487 additions and 1 deletion.
342 changes: 342 additions & 0 deletions datafusion/core/benches/bloom_filter_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::create_matching_csv;
use crate::data_utils::create_sparse_csv;
use crate::data_utils::{create_tpch_q17_lineitem_csv, create_tpch_q17_part_csv};
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow_schema::Schema;
use data_utils::{
create_high_selectivity_csv, create_non_equijoin_csv, create_other_table_csv,
create_skewed_data_csv,
};
use datafusion::execution::context::SessionContext;
use datafusion::prelude::CsvReadOptions;
use parking_lot::Mutex;
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

fn lineitem_schema() -> Schema {
Schema::new(vec![
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_quantity", DataType::Float64, false),
Field::new("l_extendedprice", DataType::Float64, false),
])
}

fn part_schema() -> Schema {
Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_container", DataType::Utf8, false),
])
}

fn create_context_with_csv_tpch_17(
part_file: &NamedTempFile,
lineitem_file: &NamedTempFile,
) -> Arc<Mutex<SessionContext>> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();

let part_schema = part_schema();
let lineitem_schema = lineitem_schema();

let part_options = CsvReadOptions::new().has_header(true).schema(&part_schema);

let lineitem_options = CsvReadOptions::new()
.has_header(true)
.schema(&lineitem_schema);

rt.block_on(async {
ctx.register_csv("part", part_file.path().to_str().unwrap(), part_options)
.await
.unwrap();
ctx.register_csv(
"lineitem",
lineitem_file.path().to_str().unwrap(),
lineitem_options,
)
.await
.unwrap();
});

Arc::new(Mutex::new(ctx))
}

fn create_context_with_high_selectivity(
table_file: &NamedTempFile,
) -> Arc<Mutex<SessionContext>> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]));

let options = CsvReadOptions::new().has_header(true).schema(&schema);

rt.block_on(async {
ctx.register_csv("test", table_file.path().to_str().unwrap(), options)
.await
.unwrap();
});

Arc::new(Mutex::new(ctx))
}

fn create_context_with_skewed_data(
table_file: &NamedTempFile,
) -> Arc<Mutex<SessionContext>> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::Int64, false),
Field::new("value", DataType::Utf8, false),
]));

let options = CsvReadOptions::new().has_header(true).schema(&schema);

rt.block_on(async {
ctx.register_csv("skewed_table", table_file.path().to_str().unwrap(), options)
.await
.unwrap();
});

Arc::new(Mutex::new(ctx))
}

fn create_context_with_non_equijoin(
non_equijoin_file: &NamedTempFile,
other_table_file: &NamedTempFile,
) -> Arc<Mutex<SessionContext>> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();

// Define the schema for the non-equijoin table
let non_equijoin_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("range_start", DataType::Int64, false),
Field::new("range_end", DataType::Int64, false),
]));

// Define the schema for the other table
let other_table_schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
false,
)]));

// Set the options for reading the non-equijoin CSV file
let non_equijoin_options = CsvReadOptions::new()
.has_header(true)
.schema(&non_equijoin_schema);

// Set the options for reading the other table CSV file
let other_table_options = CsvReadOptions::new()
.has_header(true)
.schema(&other_table_schema);

rt.block_on(async {
// Register the non-equijoin table with the session context
ctx.register_csv(
"non_equijoin_table",
non_equijoin_file.path().to_str().unwrap(),
non_equijoin_options,
)
.await
.unwrap();

// Register the other table with the session context
ctx.register_csv(
"other_table",
other_table_file.path().to_str().unwrap(),
other_table_options,
)
.await
.unwrap();
});

Arc::new(Mutex::new(ctx))
}

fn create_context_with_sparse_data(
sparse_file: &NamedTempFile,
matching_file: &NamedTempFile,
) -> Arc<Mutex<SessionContext>> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();

let sparse_schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::Int64, false),
Field::new("value", DataType::Utf8, false),
]));

let matching_schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::Int64, false),
Field::new("description", DataType::Utf8, false),
]));

let sparse_options = CsvReadOptions::new()
.has_header(true)
.schema(&sparse_schema);
let matching_options = CsvReadOptions::new()
.has_header(true)
.schema(&matching_schema);

rt.block_on(async {
ctx.register_csv(
"sparse_table",
sparse_file.path().to_str().unwrap(),
sparse_options,
)
.await
.unwrap();
ctx.register_csv(
"matching_table",
matching_file.path().to_str().unwrap(),
matching_options,
)
.await
.unwrap();
});

Arc::new(Mutex::new(ctx))
}

fn criterion_benchmark(c: &mut Criterion) {
let high_select = create_high_selectivity_csv().unwrap();

// when bloom filter join would perform well
// TPCH Q17
let part_file = create_tpch_q17_part_csv().unwrap();
let lineitem_file = create_tpch_q17_lineitem_csv().unwrap();
let tpch_17_ctx = create_context_with_csv_tpch_17(&part_file, &lineitem_file);
c.bench_function("TPCH Q17", |b| {
b.iter(|| {
query(
tpch_17_ctx.clone(),
"SELECT
SUM(l_extendedprice) / 7.0 AS avg_yearly
FROM
part, lineitem
WHERE
p_partkey = l_partkey
AND p_brand = 'Brand#23'
AND p_container = 'MED BOX'
AND l_quantity < (
SELECT 0.2 * AVG(l_quantity)
FROM lineitem
WHERE l_partkey = p_partkey
)",
)
})
});

let sparse_file = create_sparse_csv().unwrap();
let matching_file = create_matching_csv().unwrap();
let ctx_sparse = create_context_with_sparse_data(&sparse_file, &matching_file);

c.bench_function("Sparse Data Join", |b| {
b.iter(|| {
query(
ctx_sparse.clone(),
"SELECT s.key, s.value, m.description
FROM sparse_table s
JOIN matching_table m ON s.key = m.key
WHERE s.value = 'MatchingValue'",
)
})
});

let ctx_high_selectivity = create_context_with_high_selectivity(&high_select);

// when bloom filter join perform not well
c.bench_function("High Selectivity Join", |b| {
b.iter(|| {
query(
ctx_high_selectivity.clone(),
"SELECT test.id, test.value
FROM test
JOIN (
SELECT id
FROM test
WHERE value > 75.0
) subquery
ON test.id = subquery.id
WHERE test.value > 50.0
ORDER BY test.value DESC
LIMIT 10",
)
})
});

let skewed_data = create_skewed_data_csv().unwrap();
let ctx_skewed = create_context_with_skewed_data(&skewed_data);
c.bench_function("Skewed Data Join", |b| {
b.iter(|| {
query(
ctx_skewed.clone(),
"SELECT s.key, s.value, COUNT(*)
FROM skewed_table s
JOIN skewed_table t ON s.key = t.key
WHERE s.key = 1
GROUP BY s.key, s.value
HAVING COUNT(*) > 100
ORDER BY s.value",
)
})
});

let non_equijoin = create_non_equijoin_csv().unwrap();
let other_table = create_other_table_csv().unwrap();
let ctx_non_equijoin = create_context_with_non_equijoin(&non_equijoin, &other_table);
c.bench_function("Non-Equijoin", |b| {
b.iter(|| {
query(
ctx_non_equijoin.clone(),
"SELECT n.id, n.range_start, n.range_end, o.value
FROM non_equijoin_table n
JOIN other_table o
ON n.range_start < o.value AND n.range_end > o.value
WHERE o.value BETWEEN 100 AND 1000
ORDER BY n.range_end - n.range_start DESC
LIMIT 50",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading

0 comments on commit 02b1a4a

Please sign in to comment.