Skip to content

Commit

Permalink
perf: revert back to hashmap (#1692)
Browse files Browse the repository at this point in the history
revert back to hashmap for better performance for now.

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
chebbyChefNEQ and wjones127 authored Dec 8, 2023
1 parent d7447fe commit 79b9233
Show file tree
Hide file tree
Showing 16 changed files with 52 additions and 66 deletions.
1 change: 0 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ lazy_static = "1"
log = "0.4"
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
nohash-hasher = { version = "0.2.0" }
num_cpus = "1.0"
num-traits = "0.2"
object_store = { version = "0.7.1", features = ["aws", "gcp", "azure"] }
Expand Down
1 change: 0 additions & 1 deletion rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ lance-core.workspace = true
lance-datafusion.workspace = true
lance-linalg.workspace = true
log.workspace = true
nohash-hasher.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
object_store.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Scalar indices for metadata search & filtering
use std::collections::HashMap;
use std::{any::Any, ops::Bound, sync::Arc};

use arrow_array::{RecordBatch, UInt64Array};
Expand All @@ -23,7 +24,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::scalar::ScalarValue;

use lance_core::Result;
use nohash_hasher::IntMap;

use crate::Index;

Expand Down Expand Up @@ -154,7 +154,7 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index {
/// Remap the row ids, creating a new remapped version of this index in `dest_store`
async fn remap(
&self,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<()>;

Expand Down
7 changes: 3 additions & 4 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{
any::Any,
cmp::Ordering,
collections::{BTreeMap, BinaryHeap},
collections::{BTreeMap, BinaryHeap, HashMap},
fmt::{Debug, Display},
ops::Bound,
sync::Arc,
Expand Down Expand Up @@ -44,7 +44,6 @@ use lance_datafusion::{
chunker::chunk_concat_stream,
exec::{execute_plan, OneShotExec},
};
use nohash_hasher::IntMap;
use roaring::RoaringBitmap;
use serde::{Serialize, Serializer};
use snafu::{location, Location};
Expand Down Expand Up @@ -876,7 +875,7 @@ impl ScalarIndex for BTreeIndex {

async fn remap(
&self,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<()> {
// Remap and write the pages
Expand Down Expand Up @@ -984,7 +983,7 @@ pub trait BTreeSubIndex: Debug + Send + Sync {
async fn remap_subindex(
&self,
serialized: RecordBatch,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
) -> Result<RecordBatch>;
}

Expand Down
12 changes: 6 additions & 6 deletions rust/lance-index/src/scalar/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::{any::Any, ops::Bound, sync::Arc};

use arrow_array::{
Expand All @@ -23,7 +24,6 @@ use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_physical_expr::expressions::{in_list, lit, Column};
use lance_core::{format::RowAddress, Result};
use nohash_hasher::IntMap;
use roaring::RoaringBitmap;
use serde::Serialize;

Expand Down Expand Up @@ -52,7 +52,7 @@ impl FlatIndex {
}
}

fn remap_batch(batch: RecordBatch, mapping: &IntMap<u64, Option<u64>>) -> Result<RecordBatch> {
fn remap_batch(batch: RecordBatch, mapping: &HashMap<u64, Option<u64>>) -> Result<RecordBatch> {
let row_ids = batch.column(1).as_primitive::<UInt64Type>();
let val_idx_and_new_id = row_ids
.values()
Expand Down Expand Up @@ -123,7 +123,7 @@ impl BTreeSubIndex for FlatIndexMetadata {
async fn remap_subindex(
&self,
serialized: RecordBatch,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
) -> Result<RecordBatch> {
remap_batch(serialized, mapping)
}
Expand Down Expand Up @@ -254,7 +254,7 @@ impl ScalarIndex for FlatIndex {
// Same as above, this is dead code at the moment but should work
async fn remap(
&self,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<()> {
let remapped = remap_batch((*self.data).clone(), mapping)?;
Expand Down Expand Up @@ -362,7 +362,7 @@ mod tests {
// 0 -> 2000
// 3 -> delete
// Keep remaining as is
let mapping = IntMap::<u64, Option<u64>>::from_iter(vec![(0, Some(2000)), (3, None)]);
let mapping = HashMap::<u64, Option<u64>>::from_iter(vec![(0, Some(2000)), (3, None)]);
let metadata = FlatIndexMetadata::new(DataType::Int32);
let remapped = metadata
.remap_subindex((*index.data).clone(), &mapping)
Expand All @@ -388,7 +388,7 @@ mod tests {
#[tokio::test]
async fn test_remap_to_nothing() {
let index = example_index();
let mapping = IntMap::<u64, Option<u64>>::from_iter(vec![
let mapping = HashMap::<u64, Option<u64>>::from_iter(vec![
(5, None),
(0, None),
(3, None),
Expand Down
16 changes: 8 additions & 8 deletions rust/lance-index/src/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! IVF - Inverted File Index
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -38,7 +39,6 @@ use lance_linalg::{
MatrixView,
};
use log::{debug, info};
use nohash_hasher::IntMap;
use snafu::{location, Location};
use tracing::{instrument, Instrument};

Expand All @@ -58,7 +58,7 @@ fn new_ivf_impl<T: ArrowFloatType + Dot + Cosine + L2 + 'static>(
metric_type: MetricType,
transforms: Vec<Arc<dyn Transformer>>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Arc<dyn Ivf> {
let mat = MatrixView::<T>::new(Arc::new(centroids.clone()), dimension);
Arc::new(IvfImpl::<T>::new(
Expand All @@ -85,7 +85,7 @@ pub fn new_ivf(
metric_type: MetricType,
transforms: Vec<Arc<dyn Transformer>>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Result<Arc<dyn Ivf>> {
match centroids.data_type() {
DataType::Float16 => Ok(new_ivf_impl::<Float16Type>(
Expand Down Expand Up @@ -129,7 +129,7 @@ fn new_ivf_with_pq_impl<T: ArrowFloatType + Dot + Cosine + L2 + 'static>(
vector_column: &str,
pq: Arc<dyn ProductQuantizer>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Arc<dyn Ivf> {
let mat = MatrixView::<T>::new(Arc::new(centroids.clone()), dimension);
Arc::new(IvfImpl::<T>::new_with_pq(
Expand All @@ -149,7 +149,7 @@ pub fn new_ivf_with_pq(
vector_column: &str,
pq: Arc<dyn ProductQuantizer>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Result<Arc<dyn Ivf>> {
match centroids.data_type() {
DataType::Float16 => Ok(new_ivf_with_pq_impl::<Float16Type>(
Expand Down Expand Up @@ -255,7 +255,7 @@ pub struct IvfImpl<T: ArrowFloatType + Dot + L2 + Cosine> {
/// Only covers a range of partitions.
partition_range: Option<Range<u32>>,

precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
}

impl<T: ArrowFloatType + Dot + L2 + Cosine + 'static> IvfImpl<T> {
Expand All @@ -264,7 +264,7 @@ impl<T: ArrowFloatType + Dot + L2 + Cosine + 'static> IvfImpl<T> {
metric_type: MetricType,
transforms: Vec<Arc<dyn Transformer>>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Self {
Self {
centroids,
Expand All @@ -281,7 +281,7 @@ impl<T: ArrowFloatType + Dot + L2 + Cosine + 'static> IvfImpl<T> {
vector_column: &str,
pq: Arc<dyn ProductQuantizer>,
range: Option<Range<u32>>,
precomputed_partitions: Option<IntMap<u64, u32>>,
precomputed_partitions: Option<HashMap<u64, u32>>,
) -> Self {
Self {
centroids: centroids.clone(),
Expand Down
1 change: 0 additions & 1 deletion rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ futures.workspace = true
uuid.workspace = true
shellexpand.workspace = true
arrow = { version = "47.0.0", features = ["prettyprint"] }
nohash-hasher.workspace = true
num_cpus.workspace = true
# TODO: use datafusion sub-modules to reduce build size?
datafusion.workspace = true
Expand Down
7 changes: 3 additions & 4 deletions rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use async_trait::async_trait;
use lance_core::{
format::{Fragment, Index},
Error, Result,
};
use nohash_hasher::IntMap;
use serde::{Deserialize, Serialize};
use snafu::{location, Location};

Expand Down Expand Up @@ -51,7 +50,7 @@ impl DatasetIndexRemapper {
async fn remap_index(
&self,
index: &Index,
mapping: &IntMap<u64, Option<u64>>,
mapping: &HashMap<u64, Option<u64>>,
) -> Result<RemappedIndex> {
let new_uuid = remap_index(&self.dataset, &index.uuid, mapping).await?;
Ok(RemappedIndex::new(index.uuid, new_uuid))
Expand All @@ -62,7 +61,7 @@ impl DatasetIndexRemapper {
impl IndexRemapper for DatasetIndexRemapper {
async fn remap_indices(
&self,
mapping: IntMap<u64, Option<u64>>,
mapping: HashMap<u64, Option<u64>>,
affected_fragment_ids: &[u64],
) -> Result<Vec<RemappedIndex>> {
let affected_frag_ids = HashSet::<u64>::from_iter(affected_fragment_ids.iter().copied());
Expand Down
28 changes: 13 additions & 15 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{StreamExt, TryStreamExt};
use nohash_hasher::{BuildNoHashHasher, IntMap};
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
Expand Down Expand Up @@ -135,7 +134,7 @@ impl RemappedIndex {
pub trait IndexRemapper: Send + Sync {
async fn remap_indices(
&self,
index_map: IntMap<u64, Option<u64>>,
index_map: HashMap<u64, Option<u64>>,
affected_fragment_ids: &[u64],
) -> Result<Vec<RemappedIndex>>;
}
Expand Down Expand Up @@ -194,7 +193,7 @@ pub struct IgnoreRemap {}
impl IndexRemapper for IgnoreRemap {
async fn remap_indices(
&self,
_: IntMap<u64, Option<u64>>,
_: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
Ok(Vec::new())
Expand Down Expand Up @@ -613,7 +612,7 @@ pub struct RewriteResult {
pub read_version: u64,
/// The original fragments being replaced
pub original_fragments: Vec<Fragment>,
pub row_id_map: IntMap<u64, Option<u64>>,
pub row_id_map: HashMap<u64, Option<u64>>,
}

/// Iterator that yields row_ids that are in the given fragments but not in
Expand Down Expand Up @@ -692,7 +691,7 @@ fn transpose_row_ids(
row_ids: RoaringTreemap,
old_fragments: &Vec<Fragment>,
new_fragments: &[Fragment],
) -> IntMap<u64, Option<u64>> {
) -> HashMap<u64, Option<u64>> {
let new_ids = new_fragments.iter().flat_map(|frag| {
(0..frag.physical_rows.unwrap() as u32).map(|offset| {
Some(u64::from(RowAddress::new_from_parts(
Expand All @@ -716,8 +715,7 @@ fn transpose_row_ids(
// We expect row ids to be unique, so we should already not get many collisions.
// The default hasher is designed to be resistance to DoS attacks, which is
// more than we need for this use case.
let mut mapping: IntMap<u64, Option<u64>> =
HashMap::with_capacity_and_hasher(expected_size, BuildNoHashHasher::default());
let mut mapping: HashMap<u64, Option<u64>> = HashMap::with_capacity(expected_size);
mapping.extend(row_ids.iter().zip(new_ids));
MissingIds::new(row_ids.into_iter(), old_fragments).for_each(|id| {
mapping.insert(id, None);
Expand Down Expand Up @@ -770,7 +768,7 @@ async fn rewrite_files(
new_fragments: Vec::new(),
read_version: dataset.manifest.version,
original_fragments: task.fragments,
row_id_map: HashMap::with_hasher(BuildNoHashHasher::default()),
row_id_map: HashMap::new(),
});
}

Expand Down Expand Up @@ -814,7 +812,7 @@ async fn rewrite_files(

reserve_fragment_ids(&dataset, &mut new_fragments).await?;

let row_id_map: IntMap<u64, Option<u64>> =
let row_id_map: HashMap<u64, Option<u64>> =
transpose_row_ids(row_ids, &fragments, &new_fragments);

metrics.files_removed = task
Expand Down Expand Up @@ -856,7 +854,7 @@ pub async fn commit_compaction(
let mut rewrite_groups = Vec::with_capacity(completed_tasks.len());
let mut metrics = CompactionMetrics::default();

let mut row_id_map: IntMap<u64, Option<u64>> = IntMap::default();
let mut row_id_map: HashMap<u64, Option<u64>> = HashMap::default();

for task in completed_tasks {
metrics += task.metrics;
Expand Down Expand Up @@ -956,7 +954,7 @@ mod tests {
impl IndexRemapper for IgnoreRemap {
async fn remap_indices(
&self,
_: IntMap<u64, Option<u64>>,
_: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
Ok(Vec::new())
Expand All @@ -971,7 +969,7 @@ mod tests {

#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
struct MockIndexRemapperExpectation {
expected: IntMap<u64, Option<u64>>,
expected: HashMap<u64, Option<u64>>,
answer: Vec<RemappedIndex>,
}

Expand All @@ -981,7 +979,7 @@ mod tests {
}

impl MockIndexRemapper {
fn stringify_map(map: &IntMap<u64, Option<u64>>) -> String {
fn stringify_map(map: &HashMap<u64, Option<u64>>) -> String {
let mut sorted_keys = map.keys().collect::<Vec<_>>();
sorted_keys.sort();
let mut first_keys = sorted_keys
Expand Down Expand Up @@ -1017,7 +1015,7 @@ mod tests {
impl IndexRemapper for MockIndexRemapper {
async fn remap_indices(
&self,
index_map: IntMap<u64, Option<u64>>,
index_map: HashMap<u64, Option<u64>>,
_: &[u64],
) -> Result<Vec<RemappedIndex>> {
for expectation in &self.expectations {
Expand Down Expand Up @@ -1188,7 +1186,7 @@ mod tests {
ranges: &[Vec<(Range<u64>, bool)>],
starting_new_frag_idx: u32,
) -> MockIndexRemapper {
let mut expected_remap: IntMap<u64, Option<u64>> = IntMap::default();
let mut expected_remap: HashMap<u64, Option<u64>> = HashMap::default();
expected_remap.reserve(ranges.iter().map(|r| r.len()).sum());
for (new_frag_offset, new_frag_ranges) in ranges.iter().enumerate() {
let new_frag_idx = starting_new_frag_idx + new_frag_offset as u32;
Expand Down
Loading

0 comments on commit 79b9233

Please sign in to comment.