diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs new file mode 100644 index 000000000000..17668cf93d99 --- /dev/null +++ b/datafusion/common/src/utils/memory.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides a function to estimate the memory size of a HashTable prior to alloaction + +use crate::{DataFusionError, Result}; + +/// Estimates the memory size required for a hash table prior to allocation. +/// +/// # Parameters +/// - `num_elements`: The number of elements expected in the hash table. +/// - `fixed_size`: A fixed overhead size associated with the collection +/// (e.g., HashSet or HashTable). +/// - `T`: The type of elements stored in the hash table. +/// +/// # Details +/// This function calculates the estimated memory size by considering: +/// - An overestimation of buckets to keep approximately 1/8 of them empty. +/// - The total memory size is computed as: +/// - The size of each entry (`T`) multiplied by the estimated number of +/// buckets. +/// - One byte overhead for each bucket. +/// - The fixed size overhead of the collection. +/// - If the estimation overflows, we return a [`DataFusionError`] +/// +/// # Examples +/// --- +/// +/// ## From within a struct +/// +/// ```rust +/// # use datafusion_common::utils::memory::estimate_memory_size; +/// # use datafusion_common::Result; +/// +/// struct MyStruct { +/// values: Vec, +/// other_data: usize, +/// } +/// +/// impl MyStruct { +/// fn size(&self) -> Result { +/// let num_elements = self.values.len(); +/// let fixed_size = std::mem::size_of_val(self) + +/// std::mem::size_of_val(&self.values); +/// +/// estimate_memory_size::(num_elements, fixed_size) +/// } +/// } +/// ``` +/// --- +/// ## With a simple collection +/// +/// ```rust +/// # use datafusion_common::utils::memory::estimate_memory_size; +/// # use std::collections::HashMap; +/// +/// let num_rows = 100; +/// let fixed_size = std::mem::size_of::>(); +/// let estimated_hashtable_size = +/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size) +/// .expect("Size estimation failed"); +/// ``` +pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result { + // For the majority of cases hashbrown overestimates the bucket quantity + // to keep ~1/8 of them empty. We take this factor into account by + // multiplying the number of elements with a fixed ratio of 8/7 (~1.14). + // This formula leads to overallocation for small tables (< 8 elements) + // but should be fine overall. + num_elements + .checked_mul(8) + .and_then(|overestimate| { + let estimated_buckets = (overestimate / 7).next_power_of_two(); + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of collection (HashSet/HashTable) + std::mem::size_of::() + .checked_mul(estimated_buckets)? + .checked_add(estimated_buckets)? + .checked_add(fixed_size) + }) + .ok_or_else(|| { + DataFusionError::Execution( + "usize overflow while estimating the number of buckets".to_string(), + ) + }) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::estimate_memory_size; + + #[test] + fn test_estimate_memory() { + // size (bytes): 48 + let fixed_size = std::mem::size_of::>(); + + // estimated buckets: 16 = (8 * 8 / 7).next_power_of_two() + let num_elements = 8; + // size (bytes): 128 = 16 * 4 + 16 + 48 + let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); + assert_eq!(estimated, 128); + + // estimated buckets: 64 = (40 * 8 / 7).next_power_of_two() + let num_elements = 40; + // size (bytes): 368 = 64 * 4 + 64 + 48 + let estimated = estimate_memory_size::(num_elements, fixed_size).unwrap(); + assert_eq!(estimated, 368); + } + + #[test] + fn test_estimate_memory_overflow() { + let num_elements = usize::MAX; + let fixed_size = std::mem::size_of::>(); + let estimated = estimate_memory_size::(num_elements, fixed_size); + + assert!(estimated.is_err()); + } +} diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 402ec95b33b3..ae444c2cb285 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -17,6 +17,7 @@ //! This module provides the bisect function, which implements binary search. +pub mod memory; pub mod proxy; use crate::error::{_internal_datafusion_err, _internal_err}; diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs index 95d8662e0f6e..0e7483d4a1cd 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -33,6 +33,7 @@ use arrow_schema::DataType; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::array_into_list_array; +use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::ScalarValue; use datafusion_expr::Accumulator; @@ -115,18 +116,11 @@ where } fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) + let num_elements = self.values.len(); + let fixed_size = + std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); + + estimate_memory_size::(num_elements, fixed_size).unwrap() } } @@ -202,17 +196,10 @@ where } fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) + let num_elements = self.values.len(); + let fixed_size = + std::mem::size_of_val(self) + std::mem::size_of_val(&self.values); + + estimate_memory_size::(num_elements, fixed_size).unwrap() } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e669517be400..784584f03f0f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -18,7 +18,6 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator use std::fmt; -use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; +use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError, JoinSide, JoinType, Result, @@ -875,23 +875,12 @@ async fn collect_left_input( // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` - // - // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty. - // This formula leads to overallocation for small tables (< 8 elements) but fine overall. - let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| { - DataFusionError::Execution( - "usize overflow while estimating number of hasmap buckets".to_string(), - ) - })? / 7) - .next_power_of_two(); - // 16 bytes per `(u64, u64)` - // + 1 byte for each bucket - // + fixed size of JoinHashMap (RawTable + Vec) - let estimated_hastable_size = - 16 * estimated_buckets + estimated_buckets + size_of::(); - - reservation.try_grow(estimated_hastable_size)?; - metrics.build_mem_used.add(estimated_hastable_size); + let fixed_size = std::mem::size_of::(); + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?; + + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new();