From 176b2572cb51d811449d7bda339e31af6aee9dfa Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 1 Jun 2024 16:04:27 +0200 Subject: [PATCH 1/8] refactor: extract estimate_memory_size --- datafusion/common/src/utils/memory.rs | 86 +++++++++++++++++++++++++++ datafusion/common/src/utils/mod.rs | 1 + 2 files changed, 87 insertions(+) create mode 100644 datafusion/common/src/utils/memory.rs diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs new file mode 100644 index 000000000000..ec206ac93997 --- /dev/null +++ b/datafusion/common/src/utils/memory.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides a function to estimate the memory size of a HashTable prior to alloaction + +use crate::{DataFusionError, Result}; + +/// Estimates the memory size required for a hash table prior to allocation. +/// +/// # Parameters +/// - `num_elements`: The number of elements expected in the hash table. +/// - `fixed_size`: A fixed overhead size associated with the collection +/// (e.g., HashSet or HashTable). +/// - `T`: The type of elements stored in the hash table. +/// +/// # Details +/// This function calculates the estimated memory size by considering: +/// - An overestimation of buckets to keep approximately 1/8 of them empty. +/// - The total memory size is computed as: +/// - The size of each entry (`T`) multiplied by the estimated number of +/// buckets. +/// - One byte overhead for each bucket. +/// - The fixed size overhead of the collection. +/// +/// # Panics +/// - Returns an error if the multiplication of `num_elements` by 8 overflows +/// `usize`. +pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result { + // For the majority of cases hashbrown overestimates the bucket quantity + // to keep ~1/8 of them empty. We take this factor into account by + // multiplying the number of elements with a fixed ratio of 8/7 (~1.14). + // This formula leads to overallocation for small tables (< 8 elements) + // but should be fine overall. + let estimated_buckets = (num_elements.checked_mul(8).ok_or_else(|| { + DataFusionError::Execution( + "usize overflow while estimating number of buckets".to_string(), + ) + })? / 7) + .next_power_of_two(); + + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of collection (HashSet/HashTable) + Ok(std::mem::size_of::() * estimated_buckets + estimated_buckets + fixed_size) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::estimate_memory_size; + + #[test] + fn test_estimate_memory() { + let num_elements = 8; + // size: 48 + let fixed_size = std::mem::size_of::>(); + // size: 128 = 16 * 4 + 16 + 48 + let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); + + assert_eq!(estimated, 128); + } + + #[test] + fn test_estimate_memory_overflow() { + let num_elements = usize::MAX; + let fixed_size = std::mem::size_of::>(); + let estimated = estimate_memory_size::(num_elements, fixed_size); + + assert!(estimated.is_err()); + } +} diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 402ec95b33b3..ae444c2cb285 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -17,6 +17,7 @@ //! This module provides the bisect function, which implements binary search. +pub mod memory; pub mod proxy; use crate::error::{_internal_datafusion_err, _internal_err}; From b555adc948aa499ba933113e1dda70b113753e59 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 1 Jun 2024 16:55:20 +0200 Subject: [PATCH 2/8] refactor: cap at usize::MAX --- datafusion/common/src/utils/memory.rs | 40 ++++++++++++++------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index ec206ac93997..b405bb3b30d6 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -17,8 +17,6 @@ //! This module provides a function to estimate the memory size of a HashTable prior to alloaction -use crate::{DataFusionError, Result}; - /// Estimates the memory size required for a hash table prior to allocation. /// /// # Parameters @@ -35,27 +33,31 @@ use crate::{DataFusionError, Result}; /// buckets. /// - One byte overhead for each bucket. /// - The fixed size overhead of the collection. +/// - Returns `usize::MAX` if an overflow occurs. /// -/// # Panics -/// - Returns an error if the multiplication of `num_elements` by 8 overflows -/// `usize`. -pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result { +pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> usize { // For the majority of cases hashbrown overestimates the bucket quantity // to keep ~1/8 of them empty. We take this factor into account by // multiplying the number of elements with a fixed ratio of 8/7 (~1.14). // This formula leads to overallocation for small tables (< 8 elements) // but should be fine overall. - let estimated_buckets = (num_elements.checked_mul(8).ok_or_else(|| { - DataFusionError::Execution( - "usize overflow while estimating number of buckets".to_string(), - ) - })? / 7) - .next_power_of_two(); - - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of collection (HashSet/HashTable) - Ok(std::mem::size_of::() * estimated_buckets + estimated_buckets + fixed_size) + num_elements + .checked_mul(8) + .and_then(|estimate| estimate.checked_div(7)) + .map(|buckets| buckets.next_power_of_two()) + .and_then(|estimated_buckets| { + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of collection (HashSet/HashTable) + std::mem::size_of::() + .checked_mul(estimated_buckets) + .and_then(|size_of_entries| { + estimated_buckets + .checked_add(fixed_size) + .and_then(|overhead| size_of_entries.checked_add(overhead)) + }) + }) + .unwrap_or(usize::MAX) } #[cfg(test)] @@ -70,7 +72,7 @@ mod tests { // size: 48 let fixed_size = std::mem::size_of::>(); // size: 128 = 16 * 4 + 16 + 48 - let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); + let estimated = estimate_memory_size::(num_elements, fixed_size); assert_eq!(estimated, 128); } @@ -81,6 +83,6 @@ mod tests { let fixed_size = std::mem::size_of::>(); let estimated = estimate_memory_size::(num_elements, fixed_size); - assert!(estimated.is_err()); + assert_eq!(estimated, usize::MAX); } } From a719511a1af3e9a3483cbc3af02bbd6488e06767 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 1 Jun 2024 17:01:17 +0200 Subject: [PATCH 3/8] refactor: use estimate_memory_size --- .../src/aggregate/count_distinct/native.rs | 35 ++++++------------- .../physical-plan/src/joins/hash_join.rs | 25 ++++--------- 2 files changed, 18 insertions(+), 42 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index 95d8662e0f6e..08f67faa77a1 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -33,6 +33,7 @@ use arrow_schema::DataType; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::array_into_list_array; +use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::ScalarValue; use datafusion_expr::Accumulator; @@ -115,18 +116,11 @@ where } fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) + let num_elements = self.values.len(); + let fixed_size = + std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); + + estimate_memory_size::(num_elements, fixed_size) } } @@ -202,17 +196,10 @@ where } fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) + let num_elements = self.values.len(); + let fixed_size = + std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); + + estimate_memory_size::(num_elements, fixed_size) } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e669517be400..7aba7b859981 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -18,7 +18,6 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator use std::fmt; -use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; +use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError, JoinSide, JoinType, Result, @@ -875,23 +875,12 @@ async fn collect_left_input( // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` - // - // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty. - // This formula leads to overallocation for small tables (< 8 elements) but fine overall. - let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| { - DataFusionError::Execution( - "usize overflow while estimating number of hasmap buckets".to_string(), - ) - })? / 7) - .next_power_of_two(); - // 16 bytes per `(u64, u64)` - // + 1 byte for each bucket - // + fixed size of JoinHashMap (RawTable + Vec) - let estimated_hastable_size = - 16 * estimated_buckets + estimated_buckets + size_of::(); - - reservation.try_grow(estimated_hastable_size)?; - metrics.build_mem_used.add(estimated_hastable_size); + let fixed_size = std::mem::size_of::(); + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size); + + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); From 93de4acc8b760a6bafc9fd71287a8299e4fe6310 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sat, 1 Jun 2024 20:13:00 +0200 Subject: [PATCH 4/8] chore: add examples --- datafusion/common/src/utils/memory.rs | 35 +++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index b405bb3b30d6..d15dab5e900b 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -35,6 +35,41 @@ /// - The fixed size overhead of the collection. /// - Returns `usize::MAX` if an overflow occurs. /// +/// # Examples +/// --- +/// +/// ## From within a struct +/// +/// ```rust +/// use datafusion_common::utils::memory::estimate_memory_size; +/// +/// struct MyStruct { +/// values: Vec, +/// other_data: usize, +/// } +/// +/// impl MyStruct { +/// fn size(&self) -> usize { +/// let num_elements = self.values.len(); +/// let fixed_size = std::mem::size_of_val(self) + +/// std::mem::size_of_val(&self.values); +/// +/// estimate_memory_size::(num_elements, fixed_size) +/// } +/// } +/// ``` +/// --- +/// ## With a simple collection +/// +/// ```rust +/// use datafusion_common::utils::memory::estimate_memory_size; +/// use std::collections::HashMap; +/// +/// let num_rows = 100; +/// let fixed_size = std::mem::size_of::>(); +/// let estimated_hashtable_size = +/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size); +/// ``` pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> usize { // For the majority of cases hashbrown overestimates the bucket quantity // to keep ~1/8 of them empty. We take this factor into account by From 97906ec2a22d63efb56516ac8e63ddad0dcacf64 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 2 Jun 2024 07:15:45 +0200 Subject: [PATCH 5/8] refactor: return Result; add testcase --- datafusion/common/src/utils/memory.rs | 36 ++++++++++++------- .../src/aggregate/count_distinct/native.rs | 4 +-- .../physical-plan/src/joins/hash_join.rs | 2 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index d15dab5e900b..b5d658088fda 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -17,6 +17,8 @@ //! This module provides a function to estimate the memory size of a HashTable prior to alloaction +use crate::{DataFusionError, Result}; + /// Estimates the memory size required for a hash table prior to allocation. /// /// # Parameters @@ -70,7 +72,7 @@ /// let estimated_hashtable_size = /// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size); /// ``` -pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> usize { +pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result { // For the majority of cases hashbrown overestimates the bucket quantity // to keep ~1/8 of them empty. We take this factor into account by // multiplying the number of elements with a fixed ratio of 8/7 (~1.14). @@ -85,14 +87,15 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> usize // + 1 byte for each bucket // + fixed size of collection (HashSet/HashTable) std::mem::size_of::() - .checked_mul(estimated_buckets) - .and_then(|size_of_entries| { - estimated_buckets - .checked_add(fixed_size) - .and_then(|overhead| size_of_entries.checked_add(overhead)) - }) + .checked_mul(estimated_buckets)? + .checked_add(estimated_buckets)? + .checked_add(fixed_size) + }) + .ok_or_else(|| { + DataFusionError::Execution( + "usize overflow while estimating the number of buckets".to_string(), + ) }) - .unwrap_or(usize::MAX) } #[cfg(test)] @@ -103,13 +106,20 @@ mod tests { #[test] fn test_estimate_memory() { - let num_elements = 8; - // size: 48 + // size (bytes): 48 let fixed_size = std::mem::size_of::>(); - // size: 128 = 16 * 4 + 16 + 48 - let estimated = estimate_memory_size::(num_elements, fixed_size); + // estimated buckets: 16 = (8 * 8 / 7).next_power_of_two() + let num_elements = 8; + // size (bytes): 128 = 16 * 4 + 16 + 48 + let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); assert_eq!(estimated, 128); + + // estimated buckets: 64 = (40 * 8 / 7).next_power_of_two() + let num_elements = 40; + // size (bytes): 368 = 64 * 4 + 64 + 48 + let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); + assert_eq!(estimated, 368); } #[test] @@ -118,6 +128,6 @@ mod tests { let fixed_size = std::mem::size_of::>(); let estimated = estimate_memory_size::(num_elements, fixed_size); - assert_eq!(estimated, usize::MAX); + assert!(estimated.is_err()); } } diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index 08f67faa77a1..0e7483d4a1cd 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -120,7 +120,7 @@ where let fixed_size = std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); - estimate_memory_size::(num_elements, fixed_size) + estimate_memory_size::(num_elements, fixed_size).unwrap() } } @@ -200,6 +200,6 @@ where let fixed_size = std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); - estimate_memory_size::(num_elements, fixed_size) + estimate_memory_size::(num_elements, fixed_size).unwrap() } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 7aba7b859981..784584f03f0f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -877,7 +877,7 @@ async fn collect_left_input( // Final result can be verified using `RawTable.allocation_info()` let fixed_size = std::mem::size_of::(); let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size); + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?; reservation.try_grow(estimated_hashtable_size)?; metrics.build_mem_used.add(estimated_hashtable_size); From 75b58925482248b0b1c3dcfb0e794f5cb17a73a5 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 2 Jun 2024 07:32:38 +0200 Subject: [PATCH 6/8] fix: docs --- datafusion/common/src/utils/memory.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index b5d658088fda..f1fbfc332490 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -35,7 +35,7 @@ use crate::{DataFusionError, Result}; /// buckets. /// - One byte overhead for each bucket. /// - The fixed size overhead of the collection. -/// - Returns `usize::MAX` if an overflow occurs. +/// - If the estimation overflows, we return a [`DataFusionError`] /// /// # Examples /// --- @@ -43,7 +43,8 @@ use crate::{DataFusionError, Result}; /// ## From within a struct /// /// ```rust -/// use datafusion_common::utils::memory::estimate_memory_size; +/// # use datafusion_common::utils::memory::estimate_memory_size; +/// # use datafusion_common::Result; /// /// struct MyStruct { /// values: Vec, @@ -51,7 +52,7 @@ use crate::{DataFusionError, Result}; /// } /// /// impl MyStruct { -/// fn size(&self) -> usize { +/// fn size(&self) -> Result { /// let num_elements = self.values.len(); /// let fixed_size = std::mem::size_of_val(self) + /// std::mem::size_of_val(&self.values); @@ -64,13 +65,14 @@ use crate::{DataFusionError, Result}; /// ## With a simple collection /// /// ```rust -/// use datafusion_common::utils::memory::estimate_memory_size; -/// use std::collections::HashMap; +/// # use datafusion_common::utils::memory::estimate_memory_size; +/// # use std::collections::HashMap; /// /// let num_rows = 100; /// let fixed_size = std::mem::size_of::>(); /// let estimated_hashtable_size = -/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size); +/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size) +/// .expect("Size estimation failed"); /// ``` pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result { // For the majority of cases hashbrown overestimates the bucket quantity From 33d24d129a482a05f6ba2d8a56e2cdd80845579b Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 2 Jun 2024 08:09:13 +0200 Subject: [PATCH 7/8] fix: remove unneccessary checked_div --- datafusion/common/src/utils/memory.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index f1fbfc332490..37116697a4eb 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -82,8 +82,7 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result // but should be fine overall. num_elements .checked_mul(8) - .and_then(|estimate| estimate.checked_div(7)) - .map(|buckets| buckets.next_power_of_two()) + .and_then(|estimate| Some((estimate / 7).next_power_of_two())) .and_then(|estimated_buckets| { // + size of entry * number of buckets // + 1 byte for each bucket From eebf4cf47c1585dfceb94568cacceb6a12ee9ba7 Mon Sep 17 00:00:00 2001 From: marvinlanhenke Date: Sun, 2 Jun 2024 08:12:38 +0200 Subject: [PATCH 8/8] fix: remove additional and_then --- datafusion/common/src/utils/memory.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index 37116697a4eb..17668cf93d99 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -82,8 +82,8 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result // but should be fine overall. num_elements .checked_mul(8) - .and_then(|estimate| Some((estimate / 7).next_power_of_two())) - .and_then(|estimated_buckets| { + .and_then(|overestimate| { + let estimated_buckets = (overestimate / 7).next_power_of_two(); // + size of entry * number of buckets // + 1 byte for each bucket // + fixed size of collection (HashSet/HashTable)