From 02b1a4a13212cfd48eac0ec07ac71bfef41a97c7 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Sat, 10 Aug 2024 19:32:21 -0700 Subject: [PATCH] Bloom filter Join Step I: create benchmark --- datafusion/core/benches/bloom_filter_join.rs | 342 +++++++++++++++++++ datafusion/core/benches/data_utils/mod.rs | 146 +++++++- 2 files changed, 487 insertions(+), 1 deletion(-) create mode 100644 datafusion/core/benches/bloom_filter_join.rs diff --git a/datafusion/core/benches/bloom_filter_join.rs b/datafusion/core/benches/bloom_filter_join.rs new file mode 100644 index 0000000000000..3da4c2c7dbe61 --- /dev/null +++ b/datafusion/core/benches/bloom_filter_join.rs @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use crate::data_utils::create_matching_csv; +use crate::data_utils::create_sparse_csv; +use crate::data_utils::{create_tpch_q17_lineitem_csv, create_tpch_q17_part_csv}; +use arrow::datatypes::DataType; +use arrow::datatypes::Field; +use arrow_schema::Schema; +use data_utils::{ + create_high_selectivity_csv, create_non_equijoin_csv, create_other_table_csv, + create_skewed_data_csv, +}; +use datafusion::execution::context::SessionContext; +use datafusion::prelude::CsvReadOptions; +use parking_lot::Mutex; +use std::sync::Arc; +use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +fn query(ctx: Arc>, sql: &str) { + let rt = Runtime::new().unwrap(); + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); + criterion::black_box(rt.block_on(df.collect()).unwrap()); +} + +fn lineitem_schema() -> Schema { + Schema::new(vec![ + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_quantity", DataType::Float64, false), + Field::new("l_extendedprice", DataType::Float64, false), + ]) +} + +fn part_schema() -> Schema { + Schema::new(vec![ + Field::new("p_partkey", DataType::Int64, false), + Field::new("p_brand", DataType::Utf8, false), + Field::new("p_container", DataType::Utf8, false), + ]) +} + +fn create_context_with_csv_tpch_17( + part_file: &NamedTempFile, + lineitem_file: &NamedTempFile, +) -> Arc> { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + + let part_schema = part_schema(); + let lineitem_schema = lineitem_schema(); + + let part_options = CsvReadOptions::new().has_header(true).schema(&part_schema); + + let lineitem_options = CsvReadOptions::new() + .has_header(true) + .schema(&lineitem_schema); + + rt.block_on(async { + ctx.register_csv("part", part_file.path().to_str().unwrap(), part_options) + .await + .unwrap(); + ctx.register_csv( + "lineitem", + lineitem_file.path().to_str().unwrap(), + lineitem_options, + ) + .await + .unwrap(); + }); + + Arc::new(Mutex::new(ctx)) +} + +fn create_context_with_high_selectivity( + table_file: &NamedTempFile, +) -> Arc> { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + + let options = CsvReadOptions::new().has_header(true).schema(&schema); + + rt.block_on(async { + ctx.register_csv("test", table_file.path().to_str().unwrap(), options) + .await + .unwrap(); + }); + + Arc::new(Mutex::new(ctx)) +} + +fn create_context_with_skewed_data( + table_file: &NamedTempFile, +) -> Arc> { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Utf8, false), + ])); + + let options = CsvReadOptions::new().has_header(true).schema(&schema); + + rt.block_on(async { + ctx.register_csv("skewed_table", table_file.path().to_str().unwrap(), options) + .await + .unwrap(); + }); + + Arc::new(Mutex::new(ctx)) +} + +fn create_context_with_non_equijoin( + non_equijoin_file: &NamedTempFile, + other_table_file: &NamedTempFile, +) -> Arc> { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + + // Define the schema for the non-equijoin table + let non_equijoin_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("range_start", DataType::Int64, false), + Field::new("range_end", DataType::Int64, false), + ])); + + // Define the schema for the other table + let other_table_schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + // Set the options for reading the non-equijoin CSV file + let non_equijoin_options = CsvReadOptions::new() + .has_header(true) + .schema(&non_equijoin_schema); + + // Set the options for reading the other table CSV file + let other_table_options = CsvReadOptions::new() + .has_header(true) + .schema(&other_table_schema); + + rt.block_on(async { + // Register the non-equijoin table with the session context + ctx.register_csv( + "non_equijoin_table", + non_equijoin_file.path().to_str().unwrap(), + non_equijoin_options, + ) + .await + .unwrap(); + + // Register the other table with the session context + ctx.register_csv( + "other_table", + other_table_file.path().to_str().unwrap(), + other_table_options, + ) + .await + .unwrap(); + }); + + Arc::new(Mutex::new(ctx)) +} + +fn create_context_with_sparse_data( + sparse_file: &NamedTempFile, + matching_file: &NamedTempFile, +) -> Arc> { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + + let sparse_schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("value", DataType::Utf8, false), + ])); + + let matching_schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int64, false), + Field::new("description", DataType::Utf8, false), + ])); + + let sparse_options = CsvReadOptions::new() + .has_header(true) + .schema(&sparse_schema); + let matching_options = CsvReadOptions::new() + .has_header(true) + .schema(&matching_schema); + + rt.block_on(async { + ctx.register_csv( + "sparse_table", + sparse_file.path().to_str().unwrap(), + sparse_options, + ) + .await + .unwrap(); + ctx.register_csv( + "matching_table", + matching_file.path().to_str().unwrap(), + matching_options, + ) + .await + .unwrap(); + }); + + Arc::new(Mutex::new(ctx)) +} + +fn criterion_benchmark(c: &mut Criterion) { + let high_select = create_high_selectivity_csv().unwrap(); + + // when bloom filter join would perform well + // TPCH Q17 + let part_file = create_tpch_q17_part_csv().unwrap(); + let lineitem_file = create_tpch_q17_lineitem_csv().unwrap(); + let tpch_17_ctx = create_context_with_csv_tpch_17(&part_file, &lineitem_file); + c.bench_function("TPCH Q17", |b| { + b.iter(|| { + query( + tpch_17_ctx.clone(), + "SELECT + SUM(l_extendedprice) / 7.0 AS avg_yearly + FROM + part, lineitem + WHERE + p_partkey = l_partkey + AND p_brand = 'Brand#23' + AND p_container = 'MED BOX' + AND l_quantity < ( + SELECT 0.2 * AVG(l_quantity) + FROM lineitem + WHERE l_partkey = p_partkey + )", + ) + }) + }); + + let sparse_file = create_sparse_csv().unwrap(); + let matching_file = create_matching_csv().unwrap(); + let ctx_sparse = create_context_with_sparse_data(&sparse_file, &matching_file); + + c.bench_function("Sparse Data Join", |b| { + b.iter(|| { + query( + ctx_sparse.clone(), + "SELECT s.key, s.value, m.description + FROM sparse_table s + JOIN matching_table m ON s.key = m.key + WHERE s.value = 'MatchingValue'", + ) + }) + }); + + let ctx_high_selectivity = create_context_with_high_selectivity(&high_select); + + // when bloom filter join perform not well + c.bench_function("High Selectivity Join", |b| { + b.iter(|| { + query( + ctx_high_selectivity.clone(), + "SELECT test.id, test.value + FROM test + JOIN ( + SELECT id + FROM test + WHERE value > 75.0 + ) subquery + ON test.id = subquery.id + WHERE test.value > 50.0 + ORDER BY test.value DESC + LIMIT 10", + ) + }) + }); + + let skewed_data = create_skewed_data_csv().unwrap(); + let ctx_skewed = create_context_with_skewed_data(&skewed_data); + c.bench_function("Skewed Data Join", |b| { + b.iter(|| { + query( + ctx_skewed.clone(), + "SELECT s.key, s.value, COUNT(*) + FROM skewed_table s + JOIN skewed_table t ON s.key = t.key + WHERE s.key = 1 + GROUP BY s.key, s.value + HAVING COUNT(*) > 100 + ORDER BY s.value", + ) + }) + }); + + let non_equijoin = create_non_equijoin_csv().unwrap(); + let other_table = create_other_table_csv().unwrap(); + let ctx_non_equijoin = create_context_with_non_equijoin(&non_equijoin, &other_table); + c.bench_function("Non-Equijoin", |b| { + b.iter(|| { + query( + ctx_non_equijoin.clone(), + "SELECT n.id, n.range_start, n.range_end, o.value + FROM non_equijoin_table n + JOIN other_table o + ON n.range_start < o.value AND n.range_end > o.value + WHERE o.value BETWEEN 100 AND 1000 + ORDER BY n.range_end - n.range_start DESC + LIMIT 50", + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 9d2864919225a..464f29864d6da 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -29,14 +29,17 @@ use arrow_array::builder::{Int64Builder, StringBuilder}; use datafusion::datasource::MemTable; use datafusion::error::Result; use datafusion_common::DataFusionError; +use rand::distributions::{Distribution, Uniform}; use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; -use rand_distr::Distribution; use rand_distr::{Normal, Pareto}; use std::fmt::Write; +use std::io::BufWriter; use std::sync::Arc; +use tempfile::NamedTempFile; +const BLOOM_FILTER_JOIN_SIZE: usize = 1000000; /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[allow(dead_code)] @@ -241,3 +244,144 @@ fn test_schema() -> SchemaRef { Field::new("timestamp_ms", DataType::Int64, false), ])) } + +/// data used for bloom filter join + +fn create_csv_tempfile(data: &[Vec], headers: &[&str]) -> Result { + use std::io::Write; + let mut temp_file = NamedTempFile::new()?; + { + let mut writer = BufWriter::new(&mut temp_file); + + // writing header + writeln!(writer, "{}", headers.join(","))?; + + // writing data + for row in data { + writeln!(writer, "{}", row.join(","))?; + } + } + Ok(temp_file) +} + +pub fn create_tpch_q17_lineitem_csv() -> Result { + let headers = ["l_partkey", "l_quantity", "l_extendedprice"]; + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + vec![ + v.to_string(), + (v % 50).to_string(), + (10000.0 + (v % 50) as f64 * 1.1).to_string(), + ] + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_tpch_q17_part_csv() -> Result { + let headers = ["p_partkey", "p_brand", "p_container"]; + + let brands = ["Brand#12", "Brand#23", "Brand#34", "Brand#45"]; + let containers = ["SM BOX", "MED BOX", "LG BOX", "SM PKG", "MED PKG", "LG PKG"]; + + let mut rng = rand::thread_rng(); + let brand_dist = Uniform::from(0..brands.len()); + let container_dist = Uniform::from(0..containers.len()); + + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + vec![ + v.to_string(), + brands[brand_dist.sample(&mut rng)].to_string(), + containers[container_dist.sample(&mut rng)].to_string(), + ] + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_high_selectivity_csv() -> Result { + let headers = ["id", "value"]; + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + vec![ + v.to_string(), + if v % 1000 == 0 { "100.0" } else { "1.0" }.to_string(), + ] + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_skewed_data_csv() -> Result { + let headers = ["key", "value"]; + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + vec![ + (v % 10).to_string(), // Many duplicate keys + format!("Value {}", v), + ] + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_non_equijoin_csv() -> Result { + let headers = ["id", "range_start", "range_end"]; + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + let range_start = if v >= 10 { v - 10 } else { 0 }; // Ensure no overflow + let range_end = v + 10; + vec![ + v.to_string(), + range_start.to_string(), + range_end.to_string(), + ] + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_other_table_csv() -> Result { + let headers = ["value"]; + + // Generate data for the other table, where 'value' will be compared with 'range_start' and 'range_end' + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| vec![(v * 2).to_string()]) // Create some arbitrary values for demonstration + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_sparse_csv() -> Result { + let headers = ["key", "value"]; + + // Generate a sparse dataset with very few matching keys + let data: Vec> = (0..BLOOM_FILTER_JOIN_SIZE) + .map(|v| { + if v % 1000 == 0 { + vec![v.to_string(), "MatchingValue".to_string()] // Only a few keys will match + } else { + vec![v.to_string(), "NonMatchingValue".to_string()] // Most keys will not match + } + }) + .collect(); + + create_csv_tempfile(&data, &headers) +} + +pub fn create_matching_csv() -> Result { + let headers = ["key", "description"]; + + // Generate a dataset with keys that should match the sparse dataset keys + let data: Vec> = (0..(BLOOM_FILTER_JOIN_SIZE / 1000)) + .map(|v| vec![(v * 1000).to_string(), "This is a matching key".to_string()]) // These will match with sparse data + .collect(); + + create_csv_tempfile(&data, &headers) +}