From 9095098c229a01443b1464d5bf1705d78449d717 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 31 Jan 2024 16:55:48 +0000 Subject: [PATCH 1/8] Update to compile with latest Rust (part 1) This change addresses the low-hanging fruit: 1. Update rust-toolchain to a recent nightly. 2. Remove feature flags for invalid features. 3. Update feature usage (mostly MaybeUnInit::get_{ref,mut} and Vec::drain_filter. --- rust-toolchain | 2 +- src/core/codec/postings/for_util.rs | 16 +++++++------- src/core/codec/postings/terms_hash.rs | 10 ++++----- .../codec/postings/terms_hash_per_field.rs | 12 +++++----- .../term_vectors/term_vector_consumer.rs | 14 ++++++------ src/core/index/merge/merge_scheduler.rs | 2 +- .../index/writer/doc_writer_per_thread.rs | 16 +++++++------- src/core/index/writer/flush_policy.rs | 2 +- src/core/index/writer/index_file_deleter.rs | 4 ++-- src/core/index/writer/index_writer.rs | 4 ++-- src/core/search/query/spans/span_near.rs | 22 +++++++++---------- src/core/util/bits.rs | 3 ++- src/core/util/doc_id_set.rs | 2 +- src/lib.rs | 8 ++----- 14 files changed, 57 insertions(+), 60 deletions(-) diff --git a/rust-toolchain b/rust-toolchain index 7b70b33..207f9ec 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2020-03-12 +nightly-2024-01-30 diff --git a/src/core/codec/postings/for_util.rs b/src/core/codec/postings/for_util.rs index 0c985f5..4a48dc2 100644 --- a/src/core/codec/postings/for_util.rs +++ b/src/core/codec/postings/for_util.rs @@ -132,9 +132,9 @@ impl ForUtilInstance { let format = Format::with_id(format_id); encoded_sizes[bpv] = encoded_size(format, packed_ints_version, bits_per_value); unsafe { - decoders.get_mut()[bpv] = get_decoder(format, packed_ints_version, bits_per_value)?; - encoders.get_mut()[bpv] = get_encoder(format, packed_ints_version, bits_per_value)?; - iterations[bpv] = compute_iterations(&decoders.get_ref()[bpv]); + decoders.assume_init_mut()[bpv] = get_decoder(format, packed_ints_version, bits_per_value)?; + encoders.assume_init_mut()[bpv] = get_encoder(format, packed_ints_version, bits_per_value)?; + iterations[bpv] = compute_iterations(&decoders.assume_init_ref()[bpv]); } } @@ -168,9 +168,9 @@ impl ForUtilInstance { debug_assert!(bits_per_value <= 32); encoded_sizes[bpv - 1] = encoded_size(format, VERSION_CURRENT, bits_per_value); unsafe { - decoders.get_mut()[bpv - 1] = get_decoder(format, VERSION_CURRENT, bits_per_value)?; - encoders.get_mut()[bpv - 1] = get_encoder(format, VERSION_CURRENT, bits_per_value)?; - iterations[bpv - 1] = compute_iterations(&decoders.get_ref()[bpv - 1]); + decoders.assume_init_mut()[bpv - 1] = get_decoder(format, VERSION_CURRENT, bits_per_value)?; + encoders.assume_init_mut()[bpv - 1] = get_encoder(format, VERSION_CURRENT, bits_per_value)?; + iterations[bpv - 1] = compute_iterations(&decoders.assume_init_ref()[bpv - 1]); } output.write_vint(format.get_id() << 5 | (bits_per_value - 1))?; @@ -221,7 +221,7 @@ impl ForUtilInstance { } let encoded_size = self.encoded_sizes[num_bits - 1]; - let decoder = unsafe { &self.decoders.get_ref()[num_bits - 1] }; + let decoder = unsafe { &self.decoders.assume_init_ref()[num_bits - 1] }; if let Some(p) = partial_decoder { let format = match decoder { &BulkOperationEnum::Packed(_) => Format::Packed, @@ -410,7 +410,7 @@ impl ForUtil { assert!(num_bits > 0 && num_bits <= 32); let iters = self.instance.iterations[num_bits - 1]; - let encoder = unsafe { &self.instance.encoders.get_ref()[num_bits - 1] }; + let encoder = unsafe { &self.instance.encoders.assume_init_ref()[num_bits - 1] }; assert!(iters * encoder.byte_value_count() as i32 >= BLOCK_SIZE); let encoded_size = self.instance.encoded_sizes[num_bits - 1]; debug_assert!(iters * encoder.byte_block_count() as i32 >= encoded_size); diff --git a/src/core/codec/postings/terms_hash.rs b/src/core/codec/postings/terms_hash.rs index 8177801..1ac1194 100644 --- a/src/core/codec/postings/terms_hash.rs +++ b/src/core/codec/postings/terms_hash.rs @@ -262,7 +262,7 @@ where let mut all_fields = Vec::with_capacity(field_to_flush.len()); for (_, f) in field_to_flush { unsafe { - if !f.base().bytes_hash.get_ref().is_empty() { + if !f.base().bytes_hash.assume_init_ref().is_empty() { // TODO: Hack logic, it's because it's hard to gain param `field_to_flush` as // `HashMap<&str, &mut FreqProxTermsWriterPerField>` // this should be fixed later @@ -472,7 +472,7 @@ where fn new(terms_writer: &FreqProxTermsWriterPerField) -> Self { FreqProxTermsIterator { terms_writer, - num_terms: unsafe { terms_writer.base.bytes_hash.get_ref().len() }, + num_terms: unsafe { terms_writer.base.bytes_hash.assume_init_ref().len() }, ord: -1, scratch: BytesRef::default(), } @@ -487,7 +487,7 @@ where } fn set_bytes(&mut self, term_id: usize) { - let idx = unsafe { self.terms().base.bytes_hash.get_ref().ids[term_id] as usize }; + let idx = unsafe { self.terms().base.bytes_hash.assume_init_ref().ids[term_id] as usize }; let text_start = self.terms().base.postings_array.base.text_starts[idx]; self.scratch = self .terms() @@ -589,7 +589,7 @@ where let mut pos_iter = FreqProxPostingsIterator::new(self.terms()); unsafe { pos_iter - .reset(self.terms().base.bytes_hash.get_ref().ids[self.ord as usize] as usize); + .reset(self.terms().base.bytes_hash.assume_init_ref().ids[self.ord as usize] as usize); } Ok(FreqProxPostingIterEnum::Postings(pos_iter)) } else { @@ -604,7 +604,7 @@ where let mut pos_iter = FreqProxDocsIterator::new(self.terms()); unsafe { pos_iter - .reset(self.terms().base.bytes_hash.get_ref().ids[self.ord as usize] as usize); + .reset(self.terms().base.bytes_hash.assume_init_ref().ids[self.ord as usize] as usize); } Ok(FreqProxPostingIterEnum::Docs(pos_iter)) } diff --git a/src/core/codec/postings/terms_hash_per_field.rs b/src/core/codec/postings/terms_hash_per_field.rs index 81a477e..0f3de09 100644 --- a/src/core/codec/postings/terms_hash_per_field.rs +++ b/src/core/codec/postings/terms_hash_per_field.rs @@ -103,7 +103,7 @@ impl TermsHashPerFieldBase { self.byte_pool = &mut parent.byte_pool; self.term_byte_pool = parent.term_byte_pool; unsafe { - self.bytes_hash.get_mut().pool = parent.term_byte_pool; + self.bytes_hash.assume_init_mut().pool = parent.term_byte_pool; } } @@ -215,7 +215,7 @@ impl TermsHashPerFieldBase { pub fn sort_postings(&mut self) { debug_assert!(self.inited); unsafe { - self.bytes_hash.get_mut().sort(); + self.bytes_hash.assume_init_mut().sort(); } } @@ -240,7 +240,7 @@ pub trait TermsHashPerField: Ord + PartialOrd + Eq + PartialEq { fn reset(&mut self) { unsafe { - self.base_mut().bytes_hash.get_mut().clear(false); + self.base_mut().bytes_hash.assume_init_mut().clear(false); } } @@ -269,7 +269,7 @@ pub trait TermsHashPerField: Ord + PartialOrd + Eq + PartialEq { let term_id = unsafe { self.base_mut() .bytes_hash - .get_mut() + .assume_init_mut() .add_by_pool_offset(text_start) }; self.base_mut().add(term_id); @@ -293,12 +293,12 @@ pub trait TermsHashPerField: Ord + PartialOrd + Eq + PartialEq { // term text into text_start address let bytes_ref = BytesRef::new(&token_stream.token().term); - let term_id = unsafe { self.base_mut().bytes_hash.get_mut().add(&bytes_ref) }; + let term_id = unsafe { self.base_mut().bytes_hash.assume_init_mut().add(&bytes_ref) }; if term_id >= 0 { unsafe { self.base_mut() .bytes_hash - .get_ref() + .assume_init_ref() .byte_start(term_id as usize); } } diff --git a/src/core/codec/term_vectors/term_vector_consumer.rs b/src/core/codec/term_vectors/term_vector_consumer.rs index 6316eb1..6bb27ad 100644 --- a/src/core/codec/term_vectors/term_vector_consumer.rs +++ b/src/core/codec/term_vectors/term_vector_consumer.rs @@ -641,13 +641,13 @@ where } self.do_vectors = false; - let num_postings = unsafe { self.base.bytes_hash.get_ref().len() }; + let num_postings = unsafe { self.base.bytes_hash.assume_init_ref().len() }; // This is called once, after inverting all occurrences // of a given field in the doc. At this point we flush // our hash into the DocWriter. unsafe { - self.base.bytes_hash.get_mut().sort(); + self.base.bytes_hash.assume_init_mut().sort(); } match &mut self.term_vectors_writer().0 { TermVectorsConsumerEnum::Raw(r) => { @@ -670,7 +670,7 @@ where } } for j in 0..num_postings { - let term_id = unsafe { self.base.bytes_hash.get_ref().ids[j] as usize }; + let term_id = unsafe { self.base.bytes_hash.assume_init_ref().ids[j] as usize }; let freq = self.base.postings_array.freqs[term_id]; // Get BytesPtr @@ -702,7 +702,7 @@ where fn reset(&mut self) { unsafe { - self.base.bytes_hash.get_mut().clear(false); + self.base.bytes_hash.assume_init_mut().clear(false); } } @@ -777,14 +777,14 @@ where debug_assert_ne!(field.field_type().index_options(), IndexOptions::Null); if first { unsafe { - if !self.base.bytes_hash.get_ref().is_empty() { + if !self.base.bytes_hash.assume_init_ref().is_empty() { // Only necessary if previous doc hit a // non-aborting exception while writing vectors in // this field: self.reset(); } - self.base.bytes_hash.get_mut().reinit(); + self.base.bytes_hash.assume_init_mut().reinit(); } self.has_payloads = false; self.do_vectors = field.field_type().store_term_vectors(); @@ -865,7 +865,7 @@ where /// RAMOutputStream, which is then quickly flushed to /// the real term vectors files in the Directory. fn finish(&mut self, _field_state: &FieldInvertState) -> Result<()> { - if self.do_vectors && unsafe { !self.base.bytes_hash.get_ref().is_empty() } { + if self.do_vectors && unsafe { !self.base.bytes_hash.assume_init_ref().is_empty() } { self.term_vectors_writer().add_field_to_flush(self); } Ok(()) diff --git a/src/core/index/merge/merge_scheduler.rs b/src/core/index/merge/merge_scheduler.rs index 86d1ac5..80cf85d 100644 --- a/src/core/index/merge/merge_scheduler.rs +++ b/src/core/index/merge/merge_scheduler.rs @@ -493,7 +493,7 @@ impl MergeThrea let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut(&l) }; scheduler_mut .merge_tasks - .drain_filter(|t| t.merge.id == one_merge.id); + .extract_if(|t| t.merge.id == one_merge.id); scheduler_mut.update_merge_threads(); // In case we had stalled indexing, we can now wake up // and possibly unstall: diff --git a/src/core/index/writer/doc_writer_per_thread.rs b/src/core/index/writer/doc_writer_per_thread.rs index 1239c37..3820a5b 100644 --- a/src/core/index/writer/doc_writer_per_thread.rs +++ b/src/core/index/writer/doc_writer_per_thread.rs @@ -167,7 +167,7 @@ where let consumer = DocConsumer::new(self, field_infos); self.consumer.write(consumer); unsafe { - self.consumer.get_mut().init(); + self.consumer.assume_init_mut().init(); } self.inited = true; @@ -213,7 +213,7 @@ where // vs non-aborting exceptions): let res = unsafe { self.consumer - .get_mut() + .assume_init_mut() .process_document(&mut self.doc_state, &mut doc) }; self.doc_state.clear(); @@ -273,7 +273,7 @@ where let res = unsafe { self.consumer - .get_mut() + .assume_init_mut() .process_document(&mut self.doc_state, &mut doc) }; if res.is_err() { @@ -388,7 +388,7 @@ where let mut flush_state = SegmentWriteState::new( Arc::clone(&self.directory), self.segment_info.clone(), - unsafe { self.consumer.get_ref().field_infos.finish()? }, + unsafe { self.consumer.assume_init_ref().field_infos.finish()? }, Some(&self.pending_updates), ctx, "".into(), @@ -438,11 +438,11 @@ where // re-init unsafe { - self.consumer.get_mut().reset_doc_writer(doc_writer); - self.consumer.get_mut().init(); + self.consumer.assume_init_mut().reset_doc_writer(doc_writer); + self.consumer.assume_init_mut().init(); } - let sort_map = unsafe { self.consumer.get_mut().flush(&mut flush_state)? }; + let sort_map = unsafe { self.consumer.assume_init_mut().flush(&mut flush_state)? }; self.pending_updates.deleted_terms.clear(); self.segment_info .set_files(&self.directory.create_files())?; @@ -596,7 +596,7 @@ where debug!("DWPT: now abort"); unsafe { - if let Err(e) = self.consumer.get_mut().abort() { + if let Err(e) = self.consumer.assume_init_mut().abort() { error!("DefaultIndexChain abort failed by error: '{:?}'", e); } } diff --git a/src/core/index/writer/flush_policy.rs b/src/core/index/writer/flush_policy.rs index a9db6b2..2cf9df0 100644 --- a/src/core/index/writer/flush_policy.rs +++ b/src/core/index/writer/flush_policy.rs @@ -183,7 +183,7 @@ impl FlushPolicy { if (self.index_write_config.flush_on_doc_count() && state.dwpt().num_docs_in_ram >= self.index_write_config.max_buffered_docs()) - || unsafe { state.dwpt().consumer.get_ref().need_flush() } + || unsafe { state.dwpt().consumer.assume_init_ref().need_flush() } { // Flush this state by num docs control.set_flush_pending(state, lg); diff --git a/src/core/index/writer/index_file_deleter.rs b/src/core/index/writer/index_file_deleter.rs index 75971e4..e090673 100644 --- a/src/core/index/writer/index_file_deleter.rs +++ b/src/core/index/writer/index_file_deleter.rs @@ -488,7 +488,7 @@ impl IndexFileDeleter { fn filter_dv_update_files(&self, candidates: &mut Vec<&String>) { let dv_update_files: Vec = candidates - .drain_filter(|f| -> bool { + .extract_if(|f| -> bool { self.fnm_pattern.is_match(f) || self.dv_pattern.is_match(f) }) .map(|f| f.clone()) @@ -502,7 +502,7 @@ impl IndexFileDeleter { .unwrap() .as_secs(); to_deletes = old_dv_update_files - .drain_filter(|(x, _)| -> bool { *x < tm_now }) + .extract_if(|(x, _)| -> bool { *x < tm_now }) .map(|(_, y)| y) .collect(); old_dv_update_files.push((tm_now + 60, dv_update_files)); diff --git a/src/core/index/writer/index_writer.rs b/src/core/index/writer/index_writer.rs index 5182f57..ee1a236 100644 --- a/src/core/index/writer/index_writer.rs +++ b/src/core/index/writer/index_writer.rs @@ -2846,9 +2846,9 @@ where self.segment_infos.remove(info); self.pending_num_docs .fetch_sub(info.info.max_doc as i64, Ordering::AcqRel); - if merge.segments.contains(info) { + if let Some(pos) = merge.segments.iter().position(|x| *x == *info) { self.merging_segments.remove(&info.info.name); - merge.segments.remove_item(info); + merge.segments.remove(pos); } self.reader_pool.drop(info.as_ref())?; } diff --git a/src/core/search/query/spans/span_near.rs b/src/core/search/query/spans/span_near.rs index 2e7af70..4a7a157 100644 --- a/src/core/search/query/spans/span_near.rs +++ b/src/core/search/query/spans/span_near.rs @@ -418,11 +418,11 @@ impl NearSpansUnordered

{ impl ConjunctionSpans

for NearSpansUnordered

{ fn conjunction_span_base(&self) -> &ConjunctionSpanBase

{ - unsafe { self.conjunction_span.get_ref() } + unsafe { self.conjunction_span.assume_init_ref() } } fn conjunction_span_base_mut(&mut self) -> &mut ConjunctionSpanBase

{ - unsafe { self.conjunction_span.get_mut() } + unsafe { self.conjunction_span.assume_init_mut() } } fn two_phase_current_doc_matches(&mut self) -> Result { @@ -431,8 +431,8 @@ impl ConjunctionSpans

for NearSpansUnordered

{ loop { if self.at_match() { unsafe { - self.conjunction_span.get_mut().first_in_current_doc = true; - self.conjunction_span.get_mut().one_exhausted_in_current_doc = false; + self.conjunction_span.assume_init_mut().first_in_current_doc = true; + self.conjunction_span.assume_init_mut().one_exhausted_in_current_doc = false; } return Ok(true); } @@ -450,8 +450,8 @@ impl ConjunctionSpans

for NearSpansUnordered

{ impl Spans for NearSpansUnordered

{ fn next_start_position(&mut self) -> Result { unsafe { - if self.conjunction_span.get_ref().first_in_current_doc { - self.conjunction_span.get_mut().first_in_current_doc = false; + if self.conjunction_span.assume_init_ref().first_in_current_doc { + self.conjunction_span.assume_init_mut().first_in_current_doc = false; return Ok(self.min_cell().start_position()); } } @@ -475,7 +475,7 @@ impl Spans for NearSpansUnordered

{ == NO_MORE_POSITIONS { unsafe { - self.conjunction_span.get_mut().one_exhausted_in_current_doc = true; + self.conjunction_span.assume_init_mut().one_exhausted_in_current_doc = true; } return Ok(NO_MORE_POSITIONS); } @@ -487,9 +487,9 @@ impl Spans for NearSpansUnordered

{ fn start_position(&self) -> i32 { unsafe { - if self.conjunction_span.get_ref().first_in_current_doc { + if self.conjunction_span.assume_init_ref().first_in_current_doc { -1 - } else if self.conjunction_span.get_ref().one_exhausted_in_current_doc { + } else if self.conjunction_span.assume_init_ref().one_exhausted_in_current_doc { NO_MORE_POSITIONS } else { self.min_cell().start_position() @@ -499,9 +499,9 @@ impl Spans for NearSpansUnordered

{ fn end_position(&self) -> i32 { unsafe { - if self.conjunction_span.get_ref().first_in_current_doc { + if self.conjunction_span.assume_init_ref().first_in_current_doc { -1 - } else if self.conjunction_span.get_ref().one_exhausted_in_current_doc { + } else if self.conjunction_span.assume_init_ref().one_exhausted_in_current_doc { NO_MORE_POSITIONS } else { self.sub_span_cells[self.max_end_position_cell_idx].end_position() diff --git a/src/core/util/bits.rs b/src/core/util/bits.rs index 5d269f0..4f959cd 100644 --- a/src/core/util/bits.rs +++ b/src/core/util/bits.rs @@ -418,7 +418,8 @@ impl Bits for SparseBits { impl BitsMut for SparseBits { fn get(&mut self, index: usize) -> Result { unsafe { - let ctx = &self.ctx as *const _ as *mut _; + //let ctx = &self.ctx as *const _ as *mut _; + let ctx = &mut self.ctx as *mut _; self.get64(&mut *ctx, index as i64) } } diff --git a/src/core/util/doc_id_set.rs b/src/core/util/doc_id_set.rs index 61acce8..95c44aa 100644 --- a/src/core/util/doc_id_set.rs +++ b/src/core/util/doc_id_set.rs @@ -361,7 +361,7 @@ impl DocIterator for NotDocIterator { #[derive(Debug)] pub struct EliasFanoDocIdSet { - ef_encoder: Arc, + ef_encoder:Arc, } impl EliasFanoDocIdSet { diff --git a/src/lib.rs b/src/lib.rs index 8a63258..5db0033 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,16 +16,12 @@ #![cfg_attr(feature = "clippy", plugin(clippy))] #![cfg_attr(not(feature = "clippy"), allow(unknown_lints))] #![feature(exact_size_is_empty)] -#![feature(drain_filter)] +#![feature(extract_if)] #![feature(hashmap_internals)] -#![feature(integer_atomics)] -#![feature(vec_remove_item)] +//#![feature(integer_atomics)] #![feature(specialization)] #![allow(clippy::cast_lossless)] #![feature(fn_traits)] -#![feature(maybe_uninit_ref)] -#![feature(maybe_uninit_extra)] -#![feature(in_band_lifetimes)] #![feature(vec_into_raw_parts)] #![feature(core_intrinsics)] #![feature(stmt_expr_attributes)] From eef794b91cf66536364aec56e3205d67ea1b9215 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Thu, 1 Feb 2024 18:41:11 +0000 Subject: [PATCH 2/8] Fix a bunch of compiler warnings We still have 12 compiler errors, mostly around casting &T to &mut T, but I wanted to clean up some warnings to reduce noise. --- src/core/codec/postings/for_util.rs | 8 ++--- src/core/codec/stored_fields/mod.rs | 1 - .../stored_fields/stored_fields_reader.rs | 2 +- src/core/doc/document.rs | 2 +- src/core/highlight/fragments_builder.rs | 6 ++-- src/core/index/merge/merge_scheduler.rs | 2 +- src/core/index/reader/segment_reader.rs | 2 +- .../index/writer/doc_writer_delete_queue.rs | 4 +-- src/core/index/writer/index_writer.rs | 2 +- .../search/collector/early_terminating.rs | 6 ++-- src/core/search/query/phrase_query.rs | 30 ++++++++----------- src/core/search/scorer/phrase_scorer.rs | 3 -- src/core/store/io/mmap_index_input.rs | 11 +++---- src/core/util/bkd/bkd_reader.rs | 5 ++-- src/core/util/bkd/bkd_writer.rs | 2 +- src/core/util/packed/packed_misc.rs | 26 ++++++++-------- 16 files changed, 48 insertions(+), 64 deletions(-) diff --git a/src/core/codec/postings/for_util.rs b/src/core/codec/postings/for_util.rs index 4a48dc2..866fa4a 100644 --- a/src/core/codec/postings/for_util.rs +++ b/src/core/codec/postings/for_util.rs @@ -72,10 +72,10 @@ pub fn max_data_size() -> usize { let iterations = compute_iterations(&decoder) as usize; max_data_size = max(max_data_size, iterations * decoder.byte_value_count()); } else { - panic!(format!( + panic!( "get_decoder({:?},{:?},{:?}) failed.", format, version, bpv - )); + ); } } let format = Format::PackedSingleBlock; @@ -84,10 +84,10 @@ pub fn max_data_size() -> usize { let iterations = compute_iterations(&decoder) as usize; max_data_size = max(max_data_size, iterations * decoder.byte_value_count()); } else { - panic!(format!( + panic!( "get_decoder({:?},{:?},{:?}) failed.", format, version, bpv - )); + ); } } } diff --git a/src/core/codec/stored_fields/mod.rs b/src/core/codec/stored_fields/mod.rs index 3ee489b..2b3e55b 100644 --- a/src/core/codec/stored_fields/mod.rs +++ b/src/core/codec/stored_fields/mod.rs @@ -30,7 +30,6 @@ pub use self::stored_fields_consumer::*; use core::analysis::TokenStream; use core::codec::field_infos::{FieldInfo, FieldInfos}; use core::codec::segment_infos::SegmentInfo; -use core::codec::stored_fields::CompressingStoredFieldsWriter; use core::codec::Codec; use core::doc::{FieldType, Fieldable, STORE_FIELD_TYPE}; use core::doc::{Status, StoredFieldVisitor}; diff --git a/src/core/codec/stored_fields/stored_fields_reader.rs b/src/core/codec/stored_fields/stored_fields_reader.rs index afcf722..8d530b2 100644 --- a/src/core/codec/stored_fields/stored_fields_reader.rs +++ b/src/core/codec/stored_fields/stored_fields_reader.rs @@ -656,7 +656,7 @@ impl CompressingStoredFieldsReader { Self::read_zdouble(input)?; } _ => { - debug_assert!(false, format!("Unknown type flag: {}", bits)); + debug_assert!(false, "Unknown type flag: {}", bits); } } Ok(()) diff --git a/src/core/doc/document.rs b/src/core/doc/document.rs index 910b0ca..691eacc 100644 --- a/src/core/doc/document.rs +++ b/src/core/doc/document.rs @@ -152,7 +152,7 @@ impl StoredFieldVisitor for DocumentStoredFieldVisitor { )); } Err(e) => { - panic!(format!("string_field failed: {:?}", e)); + panic!("string_field failed: {:?}", e); } } Ok(()) diff --git a/src/core/highlight/fragments_builder.rs b/src/core/highlight/fragments_builder.rs index 28e1fb8..c51def1 100644 --- a/src/core/highlight/fragments_builder.rs +++ b/src/core/highlight/fragments_builder.rs @@ -389,10 +389,8 @@ impl FragmentsBuilder for BaseFragmentsBuilder { assert!( max_num_fragments > 0, - format!( - "maxNumFragments({}) must be positive number.", - max_num_fragments - ) + "maxNumFragments({}) must be positive number.", + max_num_fragments ); let values = self.fields(reader, doc_id, field_name)?; diff --git a/src/core/index/merge/merge_scheduler.rs b/src/core/index/merge/merge_scheduler.rs index 80cf85d..3a38519 100644 --- a/src/core/index/merge/merge_scheduler.rs +++ b/src/core/index/merge/merge_scheduler.rs @@ -493,7 +493,7 @@ impl MergeThrea let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut(&l) }; scheduler_mut .merge_tasks - .extract_if(|t| t.merge.id == one_merge.id); + .retain(|t| t.merge.id != one_merge.id); scheduler_mut.update_merge_threads(); // In case we had stalled indexing, we can now wake up // and possibly unstall: diff --git a/src/core/index/reader/segment_reader.rs b/src/core/index/reader/segment_reader.rs index 69c1763..6261271 100644 --- a/src/core/index/reader/segment_reader.rs +++ b/src/core/index/reader/segment_reader.rs @@ -665,7 +665,7 @@ impl SegmentReader { pub fn check_bounds(&self, doc_id: DocId) { debug_assert!( doc_id >= 0 && doc_id < self.max_docs(), - format!("doc_id={} max_docs={}", doc_id, self.max_docs(),) + "doc_id={} max_docs={}", doc_id, self.max_docs() ); } diff --git a/src/core/index/writer/doc_writer_delete_queue.rs b/src/core/index/writer/doc_writer_delete_queue.rs index a99020f..52120fe 100644 --- a/src/core/index/writer/doc_writer_delete_queue.rs +++ b/src/core/index/writer/doc_writer_delete_queue.rs @@ -360,10 +360,10 @@ impl Drop for DeleteListNode { if Arc::strong_count(&(*next)) <= 1 { Arc::get_mut(&mut *next).unwrap().next = AtomicPtr::default(); - Box::from_raw(next); + drop(Box::from_raw(next)); next = next2; } else { - Box::from_raw(next); + drop(Box::from_raw(next)); break; } } diff --git a/src/core/index/writer/index_writer.rs b/src/core/index/writer/index_writer.rs index ee1a236..76f815f 100644 --- a/src/core/index/writer/index_writer.rs +++ b/src/core/index/writer/index_writer.rs @@ -1458,7 +1458,7 @@ where { let lock = Arc::clone(&self.lock); let l = lock.lock()?; - let _ = self.abort_merges(l)?; + drop(self.abort_merges(l)?); } self.rate_limiters = Arc::new(ThreadLocal::new()); debug!("IW - rollback: done finish merges"); diff --git a/src/core/search/collector/early_terminating.rs b/src/core/search/collector/early_terminating.rs index a5c4790..655cac6 100644 --- a/src/core/search/collector/early_terminating.rs +++ b/src/core/search/collector/early_terminating.rs @@ -31,10 +31,8 @@ impl EarlyTerminatingSortingCollector { pub fn new(num_docs_to_collect_per_reader: usize) -> EarlyTerminatingSortingCollector { assert!( num_docs_to_collect_per_reader > 0, - format!( - "num_docs_to_collect_per_reader must always be > 0, got {}", - num_docs_to_collect_per_reader - ) + "num_docs_to_collect_per_reader must always be > 0, got {}", + num_docs_to_collect_per_reader ); EarlyTerminatingSortingCollector { diff --git a/src/core/search/query/phrase_query.rs b/src/core/search/query/phrase_query.rs index 4a99d06..ddfc0f3 100644 --- a/src/core/search/query/phrase_query.rs +++ b/src/core/search/query/phrase_query.rs @@ -74,7 +74,7 @@ impl PhraseQuery { ctxs.as_ref().map(Vec::len).unwrap_or_else(|| terms.len()), "Must have as many terms as positions" ); - assert!(slop >= 0, format!("Slop must be >= 0, got {}", slop)); + assert!(slop >= 0, "Slop must be >= 0, got {}", slop); if terms.len() < 2 { bail!(ErrorKind::IllegalArgument( "phrase query terms should not be less than 2!".into() @@ -88,16 +88,14 @@ impl PhraseQuery { ); } for pos in &positions { - debug_assert!(*pos >= 0, format!("Positions must be >= 0, got {}", pos)); + debug_assert!(*pos >= 0, "Positions must be >= 0, got {}", pos); } for i in 1..positions.len() { debug_assert!( positions[i - 1] <= positions[i], - format!( - "Positions should not go backwards, got {} before {}", - positions[i - 1], - positions[i] - ) + "Positions should not go backwards, got {} before {}", + positions[i - 1], + positions[i] ); } // normalize positions @@ -278,11 +276,10 @@ impl Weight for PhraseWeight { let mut term_iter = if let Some(field_terms) = reader.reader.terms(&self.field)? { debug_assert!( field_terms.has_positions()?, - format!( - "field {} was indexed without position data; cannot run PhraseQuery \ - (phrase={:?})", - self.field, self.terms - ) + "field {} was indexed without position data; cannot run PhraseQuery \ + (phrase={:?})", + self.field, + self.terms ); field_terms.iterator()? } else { @@ -356,11 +353,10 @@ impl Weight for PhraseWeight { let mut term_iter = if let Some(field_terms) = reader.reader.terms(&self.field)? { debug_assert!( field_terms.has_positions()?, - format!( - "field {} was indexed without position data; cannot run PhraseQuery \ - (phrase={:?})", - self.field, self.terms - ) + "field {} was indexed without position data; cannot run PhraseQuery \ + (phrase={:?})", + self.field, + self.terms ); Some(field_terms.iterator()?) } else { diff --git a/src/core/search/scorer/phrase_scorer.rs b/src/core/search/scorer/phrase_scorer.rs index d8f8bbe..68e5c61 100644 --- a/src/core/search/scorer/phrase_scorer.rs +++ b/src/core/search/scorer/phrase_scorer.rs @@ -326,8 +326,6 @@ struct PhrasePositions { pub ord: i32, // unique across all PhrasePositions instances pub postings: Rc>, - // stream of docs & positions - pub next_pp_idx: i32, // used to make list pub rpt_group: i32, // >=0 indicates that this is a repeating PP @@ -347,7 +345,6 @@ impl PhrasePositions { offset, ord, postings, - next_pp_idx: -1, rpt_group: -1, rpt_ind: 0, terms, diff --git a/src/core/store/io/mmap_index_input.rs b/src/core/store/io/mmap_index_input.rs index c06104c..9f1218d 100644 --- a/src/core/store/io/mmap_index_input.rs +++ b/src/core/store/io/mmap_index_input.rs @@ -127,7 +127,6 @@ pub struct MmapIndexInput { source: ReadOnlySource, position: usize, slice: &'static [u8], - description: String, } unsafe impl Send for MmapIndexInput {} @@ -143,7 +142,6 @@ impl From for MmapIndexInput { source, slice, position: 0, - description: String::from(""), } } } @@ -186,7 +184,7 @@ impl MmapIndexInput { } } - fn slice_impl(&self, description: &str, offset: i64, length: i64) -> Result { + fn slice_impl(&self, offset: i64, length: i64) -> Result { let total_len = self.len() as i64; if offset < 0 || length < 0 || offset + length > total_len { bail!(IllegalArgument(format!( @@ -200,7 +198,6 @@ impl MmapIndexInput { slice, source: self.source.clone(), position: 0, - description: description.to_string(), }) } @@ -238,7 +235,7 @@ impl IndexInput for MmapIndexInput { } fn random_access_slice(&self, offset: i64, length: i64) -> Result> { - let boxed = self.slice_impl("RandomAccessSlice", offset, length)?; + let boxed = self.slice_impl(offset, length)?; Ok(Box::new(boxed)) } @@ -250,8 +247,8 @@ impl IndexInput for MmapIndexInput { ptr } - fn slice(&self, description: &str, offset: i64, length: i64) -> Result> { - let boxed = self.slice_impl(description, offset, length)?; + fn slice(&self, _description: &str, offset: i64, length: i64) -> Result> { + let boxed = self.slice_impl(offset, length)?; Ok(Box::new(boxed)) } diff --git a/src/core/util/bkd/bkd_reader.rs b/src/core/util/bkd/bkd_reader.rs index 5ae40b9..493f202 100644 --- a/src/core/util/bkd/bkd_reader.rs +++ b/src/core/util/bkd/bkd_reader.rs @@ -279,7 +279,7 @@ impl BKDReader { } else { // Non-leaf node: recurse on the split left and right nodes let split_dim = state.index_tree.split_dim() as usize; - debug_assert!(split_dim as i32 >= 0, format!("split_dim={}", split_dim)); + debug_assert!(split_dim as i32 >= 0, "split_dim={}", split_dim); debug_assert!(split_dim < self.num_dims); let split_packed_value_idx = state.index_tree.split_packed_value_index(); @@ -1132,7 +1132,8 @@ impl IndexTree for PackedIndexTree { fn leaf_block_fp(&self) -> i64 { debug_assert!( self.is_leaf_node(), - format!("node_id={} is not a leaf", self.index_tree.node_id) + "node_id={} is not a leaf", + self.index_tree.node_id ); self.leaf_block_fp_stack[self.index_tree.level as usize] } diff --git a/src/core/util/bkd/bkd_writer.rs b/src/core/util/bkd/bkd_writer.rs index cb39fcc..c6f92d0 100644 --- a/src/core/util/bkd/bkd_writer.rs +++ b/src/core/util/bkd/bkd_writer.rs @@ -970,7 +970,7 @@ impl BKDWriter { let mut first_diff_byte_delta: i32; if prefix < self.bytes_per_dim { first_diff_byte_delta = (split_packed_values[address + prefix] as u32 as i32) - - (last_split_values[(split_dim * self.bytes_per_dim + prefix)] as u32 as i32); + - (last_split_values[split_dim * self.bytes_per_dim + prefix] as u32 as i32); if negative_deltas[split_dim] { first_diff_byte_delta *= -1; } diff --git a/src/core/util/packed/packed_misc.rs b/src/core/util/packed/packed_misc.rs index bc31ef5..dcba8be 100644 --- a/src/core/util/packed/packed_misc.rs +++ b/src/core/util/packed/packed_misc.rs @@ -1651,7 +1651,8 @@ impl Packed64SingleBlock { pub fn new(value_count: usize, bits_per_value: usize) -> Packed64SingleBlock { debug_assert!( Self::is_supported(bits_per_value), - format!("Unsupported number of bits per value: {}", bits_per_value) + "Unsupported number of bits per value: {}", + bits_per_value ); let value_per_block = 64 / bits_per_value; let blocks = vec![0i64; Self::required_capacity(value_count, value_per_block)]; @@ -2629,10 +2630,9 @@ impl PackedIntDecoder for BulkOperationPacked { fn decode_long_to_int(&self, blocks: &[i64], values: &mut [i32], iterations: usize) { if self.bits_per_value > 32 { - panic!(format!( - "Cannot decode {} -bits values into an i32 slice", + panic!("Cannot decode {} -bits values into an i32 slice", self.bits_per_value - )); + ); } let mut bits_left = 64; @@ -2815,10 +2815,9 @@ impl PackedIntDecoder for BulkOperationPackedSingleBlock { fn decode_long_to_int(&self, blocks: &[i64], values: &mut [i32], iterations: usize) { if self.bits_per_value > 32 { - panic!(format!( - "Cannot decode {} -bits values into an i32 slice", + panic!("Cannot decode {} -bits values into an i32 slice", self.bits_per_value - )); + ); } let mut values_offset = 0; for b in blocks.iter().take(iterations) { @@ -2828,10 +2827,9 @@ impl PackedIntDecoder for BulkOperationPackedSingleBlock { fn decode_byte_to_int(&self, blocks: &[u8], values: &mut [i32], iterations: usize) { if self.bits_per_value > 32 { - panic!(format!( - "Cannot decode {} -bits values into an i32 slice", + panic!("Cannot decode {} -bits values into an i32 slice", self.bits_per_value - )); + ); } let mut values_offset = 0; for i in 0..iterations { @@ -2986,8 +2984,8 @@ pub struct BlockPackedReaderIterator { block_size: usize, pub values: Vec, // 一下两个字段用于替代原始定义中的 `values_ref` 成员 - values_offset: usize, - values_length: usize, + //values_offset: usize, + //values_length: usize, blocks: Vec, off: usize, pub ord: i64, @@ -3029,8 +3027,8 @@ impl BlockPackedReaderIterator { value_count, block_size, values, - values_offset: 0, - values_length: 0, + //values_offset: 0, + //values_length: 0, blocks: vec![], off: block_size, ord: 0, From da792684625f5534fc0e6fc97b180446b4ae47bd Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Thu, 1 Feb 2024 18:52:20 +0000 Subject: [PATCH 3/8] Adding gitignore Signed-off-by: Harsha Vamsi Kalluri --- .gitignore | 1 + Cargo.lock | 829 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 830 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..09485b8 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,829 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "aho-corasick" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81ce3d38065e618af2d7b77e10c5ad9a069859b4be3c2250f674af3840d9c8a5" +dependencies = [ + "memchr", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bit-set" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9bf6104718e80d7b26a68fdbacff3481cfc05df670821affc7e9cbc1884400c" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b4ff8b16e6076c3e14220b39fbc1fabb6737522281a388998046859400895f" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" + +[[package]] +name = "build_const" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ae4235e6dac0694637c763029ecea1a2ec9e4e06ec2729bd21ba4d9c863eb7" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chan" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14956a3dae065ffaa0d92ece848ab4ced88d32361e7fdfbfd653a5c454a1ed8" +dependencies = [ + "rand 0.3.23", +] + +[[package]] +name = "chan-signal" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b829e3f5432da0cc46577d89fc88c937e78052e6735fb47ce0213b0db120b01" +dependencies = [ + "bit-set", + "chan", + "lazy_static 0.2.11", + "libc", +] + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +dependencies = [ + "build_const", +] + +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "crossbeam" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" +dependencies = [ + "cfg-if 0.1.10", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "crossbeam-utils", + "lazy_static 1.4.0", + "maybe-uninit", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" +dependencies = [ + "cfg-if 0.1.10", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static 1.4.0", +] + +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "backtrace", + "version_check", +] + +[[package]] +name = "fasthash" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce56b715df3559085323d0a6724eccfd52994ac5abac9e9ffc6093853163f3bb" +dependencies = [ + "fasthash-sys", + "seahash", + "xoroshiro128", +] + +[[package]] +name = "fasthash-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6de941abfe2e715cdd34009d90546f850597eb69ca628ddfbf616e53dda28f8" +dependencies = [ + "gcc", +] + +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "hermit-abi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "lazy_static" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.153" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "linux-raw-sys" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + +[[package]] +name = "memchr" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" + +[[package]] +name = "memmap" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2ffa2c986de11a9df78620c01eeaaf27d94d3ff02bf81bfcca953102dd0c6ff" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "proc-macro2" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + +[[package]] +name = "rand" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" +dependencies = [ + "cloudabi", + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "winapi", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + +[[package]] +name = "regex" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", + "utf8-ranges", +] + +[[package]] +name = "regex-syntax" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7" +dependencies = [ + "ucd-util", +] + +[[package]] +name = "rucene" +version = "0.1.1" +dependencies = [ + "byteorder", + "bytes", + "chan", + "chan-signal", + "crc", + "crossbeam", + "crunchy", + "either", + "error-chain", + "fasthash", + "flate2", + "lazy_static 1.4.0", + "log", + "memmap", + "num-traits", + "num_cpus", + "rand 0.5.6", + "regex", + "serde", + "serde_derive", + "serde_json", + "smallvec", + "tempfile", + "thread_local", + "unicode_reader", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "rustix" +version = "0.38.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "seahash" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f57ca1d128a43733fd71d583e837b1f22239a37ebea09cde11d8d9a9080f47" + +[[package]] +name = "serde" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.113" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "smallvec" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" +dependencies = [ + "maybe-uninit", +] + +[[package]] +name = "syn" +version = "2.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempfile" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +dependencies = [ + "cfg-if 1.0.0", + "fastrand", + "redox_syscall", + "rustix", + "windows-sys", +] + +[[package]] +name = "thread_local" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" +dependencies = [ + "lazy_static 1.4.0", +] + +[[package]] +name = "ucd-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abd2fc5d32b590614af8b0a20d837f32eca055edd0bbead59a9cfe80858be003" + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unicode-segmentation" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" + +[[package]] +name = "unicode_reader" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "001b27e8f5e9da465b3584051a3a3d2ebefee4f8595c49e96cc1deec9667e4cc" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "utf8-ranges" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + +[[package]] +name = "xoroshiro128" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0eeda34baec49c4f1eb2c04d59b761582fd6330010f9330ca696ca1a355dfcd" +dependencies = [ + "rand 0.4.6", +] From 8edf9aab6a44c64ef8f616a0172b7f964fd53915 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Tue, 6 Feb 2024 23:49:15 +0000 Subject: [PATCH 4/8] temp Signed-off-by: Harsha Vamsi Kalluri --- src/core/index/merge/merge_rate_limiter.rs | 47 ++--- src/core/index/merge/merge_scheduler.rs | 194 ++++++++++-------- .../search/collector/early_terminating.rs | 18 +- src/core/search/collector/timeout.rs | 16 +- src/core/util/external/volatile.rs | 12 +- 5 files changed, 155 insertions(+), 132 deletions(-) diff --git a/src/core/index/merge/merge_rate_limiter.rs b/src/core/index/merge/merge_rate_limiter.rs index 4f9c230..9937522 100644 --- a/src/core/index/merge/merge_rate_limiter.rs +++ b/src/core/index/merge/merge_rate_limiter.rs @@ -14,11 +14,12 @@ use core::store::RateLimiter; use core::index::ErrorKind::MergeAborted; +use std::cell::UnsafeCell; use error::{ErrorKind, Result}; use std::f64; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::{Condvar, Mutex}; +use std::sync::{Condvar, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use core::util::external::Volatile; @@ -31,12 +32,12 @@ use core::util::external::Volatile; /// much time it spent stopped and paused, and it supports aborting. pub struct MergeRateLimiter { total_bytes_written: AtomicU64, - mb_per_sec: Volatile, - last_time: Volatile, - min_pause_check_bytes: Volatile, + mb_per_sec: Mutex>, + last_time: Mutex>, + min_pause_check_bytes: Mutex>, abort: AtomicBool, - total_paused_dur: Volatile, - total_stopped_dur: Volatile, + total_paused_dur: Mutex>, + total_stopped_dur: Mutex>, // merge: OneMerge, lock: Mutex<()>, cond: Condvar, @@ -59,12 +60,12 @@ impl MergeRateLimiter { // Now is a good time to abort the merge: self.check_abort()?; - let mb_per_sec = self.mb_per_sec.read(); + let mb_per_sec = self.mb_per_sec.lock().unwrap().read(); let seconds_to_pause = bytes as f64 / 1024.0 / 1024.0 / mb_per_sec; // Time we should sleep until; this is purely instantaneous // rate (just adds seconds onto the last time we had paused to); // maybe we should also offer decayed recent history one? - let target_time = self.last_time.read() + let target_time = self.last_time.lock().unwrap().read() + Duration::from_nanos((seconds_to_pause * 1_000_000_000.0) as u64); if target_time < cur_ns { @@ -88,7 +89,7 @@ impl MergeRateLimiter { // CMS can wake us up here if it changes our target rate: let _ = self.cond.wait_timeout(l, cur_pause_dur)?; - let rate = self.mb_per_sec.read(); + let rate = self.mb_per_sec.lock().unwrap().read(); if rate == 0.0 { Ok(PauseResult::Stopped) } else { @@ -117,12 +118,12 @@ impl Default for MergeRateLimiter { fn default() -> Self { let limiter = MergeRateLimiter { total_bytes_written: AtomicU64::new(0), - mb_per_sec: Volatile::new(0.0), - last_time: Volatile::new(SystemTime::now()), - min_pause_check_bytes: Volatile::new(0), + mb_per_sec: Mutex::new(Volatile::new(0.0)), + last_time: Mutex::new(Volatile::new(SystemTime::now())), + min_pause_check_bytes: Mutex::new(Volatile::new(0)), abort: AtomicBool::new(false), - total_paused_dur: Volatile::new(Duration::default()), - total_stopped_dur: Volatile::new(Duration::default()), + total_paused_dur: Mutex::new(Volatile::new(Duration::default())), + total_stopped_dur: Mutex::new(Volatile::new(Duration::default())), lock: Mutex::new(()), cond: Condvar::new(), }; @@ -141,7 +142,7 @@ impl RateLimiter for MergeRateLimiter { panic!("mb_per_sec must be position; got: {}", mb_per_sec); } - self.mb_per_sec.write(mb_per_sec); + self.mb_per_sec.lock().unwrap().write(mb_per_sec); // NOTE: java Double.POSITIVE_INFINITY cast to long is long.MAX_VALUE, // rust f64::INFINITY cast to u64 is 0. let check_value = MIN_PAUSE_CHECK_MSEC as f64 / 1000.0 * mb_per_sec * 1024.0 * 1024.0; @@ -152,12 +153,12 @@ impl RateLimiter for MergeRateLimiter { check_value as u64 }; self.min_pause_check_bytes - .write(::std::cmp::min(64 * 1024 * 1024, check_bytes)); + .lock().unwrap().write(::std::cmp::min(64 * 1024 * 1024, check_bytes)); self.cond.notify_one(); } fn mb_per_sec(&self) -> f64 { - self.mb_per_sec.read() + self.mb_per_sec.lock().unwrap().read() } fn pause(&self, bytes: u64) -> Result { @@ -175,7 +176,7 @@ impl RateLimiter for MergeRateLimiter { if result == PauseResult::No { // Set to curNS, not targetNS, to enforce the instant rate, not // the "averaaged over all history" rate: - self.last_time.write(cur_time); + self.last_time.lock().unwrap().write(cur_time); break; } cur_time = SystemTime::now(); @@ -184,12 +185,12 @@ impl RateLimiter for MergeRateLimiter { // Separately track when merge was stopped vs rate limited: if result == PauseResult::Stopped { - let stopped_dur = self.total_stopped_dur.read(); - self.total_stopped_dur.write(stopped_dur + dur); + let stopped_dur = self.total_stopped_dur.lock().unwrap().read(); + self.total_stopped_dur.lock().unwrap().write(stopped_dur + dur); } else { debug_assert_eq!(result, PauseResult::Paused); - let total_paused_dur = self.total_stopped_dur.read(); - self.total_paused_dur.write(total_paused_dur + dur); + let total_paused_dur = self.total_stopped_dur.lock().unwrap().read(); + self.total_paused_dur.lock().unwrap().write(total_paused_dur + dur); } paused += dur; } @@ -197,6 +198,6 @@ impl RateLimiter for MergeRateLimiter { } fn min_pause_check_bytes(&self) -> u64 { - self.min_pause_check_bytes.read() + self.min_pause_check_bytes.lock().unwrap().read() } } diff --git a/src/core/index/merge/merge_scheduler.rs b/src/core/index/merge/merge_scheduler.rs index 3a38519..9744eb8 100644 --- a/src/core/index/merge/merge_scheduler.rs +++ b/src/core/index/merge/merge_scheduler.rs @@ -17,6 +17,8 @@ use core::index::merge::{MergePolicy, MergerTrigger, OneMerge, OneMergeScheduleI use core::index::writer::IndexWriter; use core::store::directory::Directory; use core::store::RateLimiter; +use std::borrow::BorrowMut; +use std::cell::UnsafeCell; use error::{Error, ErrorKind, Result}; @@ -139,7 +141,7 @@ impl Ord for MergeTaskInfo { /// throttle the incoming threads by pausing until one more more merges complete. #[derive(Clone)] pub struct ConcurrentMergeScheduler { - inner: Arc, + inner: Arc>, } impl Default for ConcurrentMergeScheduler { @@ -155,21 +157,23 @@ impl ConcurrentMergeScheduler { panic!("max thread count must not be 0"); } Self { - inner: Arc::new(ConcurrentMergeSchedulerInner::new(max_thread_count)), + inner: Arc::new(Mutex::new(ConcurrentMergeSchedulerInner::new( + max_thread_count), + )), } } } struct ConcurrentMergeSchedulerInner { - lock: Mutex<()>, - cond: Condvar, - merge_tasks: Vec, - max_merge_count: usize, - max_thread_count: usize, - merge_thread_count: usize, - target_mb_per_sec: f64, - do_auto_io_throttle: bool, - force_merge_mb_per_sec: f64, + lock: UnsafeCell>, + cond: UnsafeCell, + merge_tasks: UnsafeCell>, + max_merge_count: UnsafeCell, + max_thread_count: UnsafeCell, + merge_thread_count: UnsafeCell, + target_mb_per_sec: UnsafeCell, + do_auto_io_throttle: UnsafeCell, + force_merge_mb_per_sec: UnsafeCell, } // Floor for IO write rate limit (we will never go any lower than this) @@ -191,23 +195,26 @@ pub const MAX_MERGING_COUNT: usize = 5; impl ConcurrentMergeSchedulerInner { fn new(max_thread_count: usize) -> Self { ConcurrentMergeSchedulerInner { - lock: Mutex::new(()), - cond: Condvar::new(), - merge_tasks: vec![], - max_merge_count: max_thread_count.max(MAX_MERGING_COUNT), - max_thread_count, - merge_thread_count: 0, - target_mb_per_sec: START_MB_PER_SEC, - do_auto_io_throttle: true, - force_merge_mb_per_sec: f64::INFINITY, + lock: UnsafeCell::new(Mutex::new(())), + cond: UnsafeCell::new(Condvar::new()), + merge_tasks: UnsafeCell::new(vec![]), + max_merge_count: UnsafeCell::new(max_thread_count.max(MAX_MERGING_COUNT)), + max_thread_count: UnsafeCell::new(max_thread_count), + merge_thread_count: UnsafeCell::new(0), + target_mb_per_sec: UnsafeCell::new(START_MB_PER_SEC), + do_auto_io_throttle: UnsafeCell::new(true), + force_merge_mb_per_sec: UnsafeCell::new(f64::INFINITY), } } #[allow(clippy::mut_from_ref)] - unsafe fn scheduler_mut(&self, _guard: &MutexGuard<()>) -> &mut ConcurrentMergeSchedulerInner { + unsafe fn scheduler_mut( + &self, + _guard: UnsafeCell<&MutexGuard<()>>, + ) -> &mut ConcurrentMergeSchedulerInner { let scheduler = self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner; - &mut *scheduler + unsafe { &mut *scheduler } } fn maybe_stall<'a, D, C, MP>( @@ -222,81 +229,93 @@ impl ConcurrentMergeSchedulerInner { { let thread_id = thread::current().id(); let mut guard = guard; - while writer.has_pending_merges() && self.merge_thread_count() >= self.max_merge_count { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - if self.merge_tasks.iter().any(|t| t.thread_id == thread_id) { - // Never stall a merge thread since this blocks the thread from - // finishing and calling updateMergeThreads, and blocking it - // accomplishes nothing anyway (it's not really a segment producer): - return (false, guard); - } + unsafe { + while writer.has_pending_merges() && self.merge_thread_count() >= *self.max_merge_count.get() { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + if self.merge_tasks.get_mut().iter().any(|t| t.thread_id == thread_id) { + // Never stall a merge thread since this blocks the thread from + // finishing and calling updateMergeThreads, and blocking it + // accomplishes nothing anyway (it's not really a segment producer): + return (false, guard); + } - // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere: - let (g, _) = self - .cond - .wait_timeout(guard, Duration::from_millis(25)) - .unwrap(); - guard = g; + // Defensively wait for only .25 seconds in case we are missing a .notify/All + // somewhere: + let (g, _) = *self + .cond + .get() + .wait_timeout(guard, Duration::from_millis(25)) + .unwrap(); + guard = g; + } } (true, guard) } fn update_merge_threads(&mut self) { - let mut active_tasks: Vec<_> = self.merge_tasks.iter().collect(); - active_tasks.sort(); - - let tasks_count = active_tasks.len(); - - let mut big_merge_count = 0; - for i in 0..tasks_count { - if active_tasks[tasks_count - 1 - i] - .merge - .estimated_merge_bytes - .read() as f64 - > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 - { - big_merge_count = tasks_count - i; - break; + unsafe { + let mut active_tasks: Vec = self.merge_tasks.get_mut().iter().collect(); + active_tasks.sort(); + + let tasks_count = active_tasks.len(); + + let mut big_merge_count = 0; + for i in 0..tasks_count { + if active_tasks[tasks_count - 1 - i] + .merge + .estimated_merge_bytes + .read() as f64 + > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 + { + big_merge_count = tasks_count - i; + break; + } } - } - for (idx, task) in active_tasks.iter().enumerate() { - // pause the thread if max_thread_count is smaller than the number of merge threads. - let do_pause = idx + self.max_thread_count < big_merge_count; - - let new_mb_per_sec = if do_pause { - 0.0 - } else if task.merge.max_num_segments.get().is_some() { - self.force_merge_mb_per_sec - } else if !self.do_auto_io_throttle - || ((task.merge.estimated_merge_bytes.read() as f64) - < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) - { - f64::INFINITY - } else { - self.target_mb_per_sec - }; + for (idx, task) in active_tasks.iter().enumerate() { + // pause the thread if max_thread_count is smaller than the number of merge threads. + let do_pause = idx + self.max_thread_count < big_merge_count; + unsafe + + let new_mb_per_sec = if do_pause { + 0.0 + } else if task.merge.max_num_segments.get().is_some() { + self.force_merge_mb_per_sec + } else if !self.do_auto_io_throttle.get() + || ((task.merge.estimated_merge_bytes.read() as f64) + < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) + { + f64::INFINITY + } else { + self.target_mb_per_sec.get() + }; - task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); + task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); + } } } fn merge_thread_count(&self) -> usize { let current_thread = thread::current().id(); - self.merge_tasks - .iter() - .filter(|t| { - t.thread_id != current_thread && t.thread_alive() && !t.merge.rate_limiter.aborted() - }) - .count() + unsafe { + self.merge_tasks + .get_mut() + .iter() + .filter(|t| { + t.thread_id != current_thread + && t.thread_alive() + && !t.merge.rate_limiter.aborted() + }) + .count() + } } fn update_io_throttle( @@ -394,7 +413,9 @@ impl MergeScheduler for ConcurrentMergeScheduler { MP: MergePolicy, { let mut guard = self.inner.lock.lock().unwrap(); - let scheduler = unsafe { self.inner.scheduler_mut(&guard) }; + let t = + guard.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; + let scheduler = unsafe { self.inner.scheduler_mut((&guard).into()) }; if trigger == MergerTrigger::Closing { // Disable throttling on close: @@ -489,8 +510,9 @@ impl MergeThrea } Ok(()) => {} } - let l = self.merge_scheduler.inner.lock.lock().unwrap(); - let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut(&l) }; + let mut l = self.merge_scheduler.inner.lock.lock().unwrap(); + let t = l.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; + let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut((&l).into()) }; scheduler_mut .merge_tasks .retain(|t| t.merge.id != one_merge.id); diff --git a/src/core/search/collector/early_terminating.rs b/src/core/search/collector/early_terminating.rs index 655cac6..ea3353a 100644 --- a/src/core/search/collector/early_terminating.rs +++ b/src/core/search/collector/early_terminating.rs @@ -19,10 +19,10 @@ use core::search::scorer::Scorer; use core::util::external::Volatile; use core::util::DocId; use error::{ErrorKind, Result}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub struct EarlyTerminatingSortingCollector { - early_terminated: Arc>, + early_terminated: Arc>>, num_docs_to_collect_per_reader: usize, num_docs_collected_per_reader: usize, } @@ -36,14 +36,14 @@ impl EarlyTerminatingSortingCollector { ); EarlyTerminatingSortingCollector { - early_terminated: Arc::new(Volatile::new(false)), + early_terminated: Arc::new(Mutex::new(Volatile::new(false))), num_docs_to_collect_per_reader, num_docs_collected_per_reader: 0, } } pub fn early_terminated(&self) -> bool { - self.early_terminated.read() + self.early_terminated.lock().unwrap().read() } } @@ -80,7 +80,7 @@ impl Collector for EarlyTerminatingSortingCollector { self.num_docs_collected_per_reader += 1; if self.num_docs_collected_per_reader > self.num_docs_to_collect_per_reader { - self.early_terminated.write(true); + self.early_terminated.lock().unwrap().write(true); bail!(ErrorKind::Collector( collector::ErrorKind::LeafCollectionTerminated, )) @@ -104,7 +104,7 @@ impl Collector for EarlyTerminatingSortingCollector { /// However the total of hit count will be vastly underestimated since not all matching documents /// will have been collected. pub struct EarlyTerminatingLeafCollector { - early_terminated: Arc>, + early_terminated: Arc>>, num_docs_to_collect: usize, num_docs_collected: usize, } @@ -112,7 +112,7 @@ pub struct EarlyTerminatingLeafCollector { impl EarlyTerminatingLeafCollector { pub fn new( num_docs_to_collect: usize, - early_terminated: Arc>, + early_terminated: Arc>>, ) -> EarlyTerminatingLeafCollector { EarlyTerminatingLeafCollector { early_terminated, @@ -122,7 +122,7 @@ impl EarlyTerminatingLeafCollector { } pub fn early_terminated(&self) -> bool { - self.early_terminated.read() + self.early_terminated.lock().unwrap().read() } } @@ -141,7 +141,7 @@ impl Collector for EarlyTerminatingLeafCollector { self.num_docs_collected += 1; if self.num_docs_collected > self.num_docs_to_collect { - self.early_terminated.write(true); + self.early_terminated.lock().unwrap().write(true); bail!(ErrorKind::Collector( collector::ErrorKind::LeafCollectionTerminated, )) diff --git a/src/core/search/collector/timeout.rs b/src/core/search/collector/timeout.rs index 0a29702..6d53ce8 100644 --- a/src/core/search/collector/timeout.rs +++ b/src/core/search/collector/timeout.rs @@ -19,7 +19,7 @@ use core::search::scorer::Scorer; use core::util::external::Volatile; use core::util::DocId; use error::{ErrorKind, Result}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; /// the `TimeoutCollector` collector is used to timeout search requests that @@ -34,7 +34,7 @@ use std::time::{Duration, SystemTime}; pub struct TimeoutCollector { timeout_duration: Duration, start_time: SystemTime, - timeout: Arc>, + timeout: Arc>>, } impl TimeoutCollector { @@ -42,12 +42,12 @@ impl TimeoutCollector { TimeoutCollector { timeout_duration, start_time, - timeout: Arc::new(Volatile::new(false)), + timeout: Arc::new(Mutex::new(Volatile::new(false))), } } pub fn timeout(&self) -> bool { - self.timeout.read() + self.timeout.lock().unwrap().read() } } @@ -86,7 +86,7 @@ impl Collector for TimeoutCollector { fn collect(&mut self, _doc: DocId, _scorer: &mut S) -> Result<()> { let now = SystemTime::now(); if self.start_time < now && now.duration_since(self.start_time)? >= self.timeout_duration { - self.timeout.write(true); + self.timeout.lock().unwrap().write(true); bail!(ErrorKind::Collector( collector::ErrorKind::CollectionTimeout, )) @@ -98,14 +98,14 @@ impl Collector for TimeoutCollector { pub struct TimeoutLeafCollector { timeout_duration: Duration, start_time: SystemTime, - timeout: Arc>, + timeout: Arc>>, } impl TimeoutLeafCollector { pub fn new( timeout_duration: Duration, start_time: SystemTime, - timeout: Arc>, + timeout: Arc>>, ) -> TimeoutLeafCollector { TimeoutLeafCollector { timeout_duration, @@ -123,7 +123,7 @@ impl Collector for TimeoutLeafCollector { fn collect(&mut self, _doc: i32, _scorer: &mut S) -> Result<()> { let now = SystemTime::now(); if self.start_time < now && now.duration_since(self.start_time)? >= self.timeout_duration { - self.timeout.write(true); + self.timeout.lock().unwrap().write(true); bail!(ErrorKind::Collector( collector::ErrorKind::CollectionTerminated, )) diff --git a/src/core/util/external/volatile.rs b/src/core/util/external/volatile.rs index 0607a49..36fffcb 100644 --- a/src/core/util/external/volatile.rs +++ b/src/core/util/external/volatile.rs @@ -53,7 +53,7 @@ //! and then perform operations on the pointer as usual in a volatile way. This method works as all //! of the volatile wrapper types are the same size as their contained values. -use std::ptr; +use std::{cell::UnsafeCell, ptr}; /// A wrapper type around a volatile variable, which allows for volatile reads and writes /// to the contained value. The stored type needs to be `Copy`, as volatile reads and writes @@ -62,7 +62,7 @@ use std::ptr; /// The size of this struct is the same as the size of the contained type. #[derive(Debug)] #[repr(transparent)] -pub struct Volatile(T); +pub struct Volatile(UnsafeCell); impl Volatile { /// Construct a new volatile instance wrapping the given value. @@ -78,7 +78,7 @@ impl Volatile { /// This method never panics. #[cfg(not(feature = "const_fn"))] pub fn new(value: T) -> Volatile { - Volatile(value) + Volatile(value.into()) } /// Performs a volatile read of the contained value, returning a copy @@ -89,7 +89,7 @@ impl Volatile { /// This method never panics. pub fn read(&self) -> T { // UNSAFE: Safe, as we know that our internal value exists. - unsafe { ptr::read_volatile(&self.0) } + unsafe { ptr::read_volatile(self.0.get() as *const T) } } /// Performs a volatile write, setting the contained value to the given value `value`. Volatile @@ -104,7 +104,7 @@ impl Volatile { /// else we need to use Atomic instead pub fn write(&self, value: T) { // UNSAFE: Safe, as we know that our internal value exists. - unsafe { ptr::write_volatile(&self.0 as *const T as *mut T, value) }; + unsafe { ptr::write_volatile(self.0.get() as *const T as *mut T, value) }; } /// Performs a volatile read of the contained value, passes a mutable reference to it to the @@ -124,6 +124,6 @@ impl Volatile { impl Clone for Volatile { fn clone(&self) -> Self { - Volatile(self.read()) + Volatile(self.read().into()) } } From 1838d0c80371d1a90709b0392c5b1a454e028df8 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Wed, 7 Feb 2024 02:16:11 +0000 Subject: [PATCH 5/8] Fixing merge scheduler Signed-off-by: Harsha Vamsi Kalluri --- src/core/index/merge/merge_scheduler.rs | 309 ++++++++++++------------ 1 file changed, 151 insertions(+), 158 deletions(-) diff --git a/src/core/index/merge/merge_scheduler.rs b/src/core/index/merge/merge_scheduler.rs index 9744eb8..c7e520a 100644 --- a/src/core/index/merge/merge_scheduler.rs +++ b/src/core/index/merge/merge_scheduler.rs @@ -17,7 +17,6 @@ use core::index::merge::{MergePolicy, MergerTrigger, OneMerge, OneMergeScheduleI use core::index::writer::IndexWriter; use core::store::directory::Directory; use core::store::RateLimiter; -use std::borrow::BorrowMut; use std::cell::UnsafeCell; use error::{Error, ErrorKind, Result}; @@ -158,22 +157,21 @@ impl ConcurrentMergeScheduler { } Self { inner: Arc::new(Mutex::new(ConcurrentMergeSchedulerInner::new( - max_thread_count), - )), + max_thread_count, + ))), } } } struct ConcurrentMergeSchedulerInner { - lock: UnsafeCell>, - cond: UnsafeCell, - merge_tasks: UnsafeCell>, - max_merge_count: UnsafeCell, - max_thread_count: UnsafeCell, - merge_thread_count: UnsafeCell, - target_mb_per_sec: UnsafeCell, - do_auto_io_throttle: UnsafeCell, - force_merge_mb_per_sec: UnsafeCell, + cond: Condvar, + merge_tasks: Vec, + max_merge_count: usize, + max_thread_count: usize, + merge_thread_count: usize, + target_mb_per_sec: f64, + do_auto_io_throttle: bool, + force_merge_mb_per_sec: f64, } // Floor for IO write rate limit (we will never go any lower than this) @@ -195,33 +193,41 @@ pub const MAX_MERGING_COUNT: usize = 5; impl ConcurrentMergeSchedulerInner { fn new(max_thread_count: usize) -> Self { ConcurrentMergeSchedulerInner { - lock: UnsafeCell::new(Mutex::new(())), - cond: UnsafeCell::new(Condvar::new()), - merge_tasks: UnsafeCell::new(vec![]), - max_merge_count: UnsafeCell::new(max_thread_count.max(MAX_MERGING_COUNT)), - max_thread_count: UnsafeCell::new(max_thread_count), - merge_thread_count: UnsafeCell::new(0), - target_mb_per_sec: UnsafeCell::new(START_MB_PER_SEC), - do_auto_io_throttle: UnsafeCell::new(true), - force_merge_mb_per_sec: UnsafeCell::new(f64::INFINITY), + cond: Condvar::new(), + merge_tasks: vec![], + max_merge_count: max_thread_count.max(MAX_MERGING_COUNT), + max_thread_count, + merge_thread_count: 0, + target_mb_per_sec: START_MB_PER_SEC, + do_auto_io_throttle: true, + force_merge_mb_per_sec: f64::INFINITY, } } + unsafe fn get_self( + ptr: &UnsafeCell, + ) -> &mut ConcurrentMergeSchedulerInner { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] unsafe fn scheduler_mut( &self, - _guard: UnsafeCell<&MutexGuard<()>>, + _guard: &MutexGuard<(ConcurrentMergeSchedulerInner)>, ) -> &mut ConcurrentMergeSchedulerInner { - let scheduler = - self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner; - unsafe { &mut *scheduler } + let t = self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner + as *const UnsafeCell; + unsafe { + let scheduler = ConcurrentMergeSchedulerInner::get_self(t.as_ref().unwrap()); + &mut *scheduler + } } fn maybe_stall<'a, D, C, MP>( &self, writer: &IndexWriter, - guard: MutexGuard<'a, ()>, - ) -> (bool, MutexGuard<'a, ()>) + guard: MutexGuard<'a, ConcurrentMergeSchedulerInner>, + ) -> (bool, MutexGuard<'a, ConcurrentMergeSchedulerInner>) where D: Directory + Send + Sync + 'static, C: Codec, @@ -229,93 +235,81 @@ impl ConcurrentMergeSchedulerInner { { let thread_id = thread::current().id(); let mut guard = guard; - unsafe { - while writer.has_pending_merges() && self.merge_thread_count() >= *self.max_merge_count.get() { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - if self.merge_tasks.get_mut().iter().any(|t| t.thread_id == thread_id) { - // Never stall a merge thread since this blocks the thread from - // finishing and calling updateMergeThreads, and blocking it - // accomplishes nothing anyway (it's not really a segment producer): - return (false, guard); - } - - // Defensively wait for only .25 seconds in case we are missing a .notify/All - // somewhere: - let (g, _) = *self - .cond - .get() - .wait_timeout(guard, Duration::from_millis(25)) - .unwrap(); - guard = g; + while writer.has_pending_merges() && self.merge_thread_count() >= self.max_merge_count { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + if self.merge_tasks.iter().any(|t| t.thread_id == thread_id) { + // Never stall a merge thread since this blocks the thread from + // finishing and calling updateMergeThreads, and blocking it + // accomplishes nothing anyway (it's not really a segment producer): + return (false, guard); } + + // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere: + let (g, _) = self + .cond + .wait_timeout(guard, Duration::from_millis(25)) + .unwrap(); + guard = g; } (true, guard) } fn update_merge_threads(&mut self) { - unsafe { - let mut active_tasks: Vec = self.merge_tasks.get_mut().iter().collect(); - active_tasks.sort(); - - let tasks_count = active_tasks.len(); - - let mut big_merge_count = 0; - for i in 0..tasks_count { - if active_tasks[tasks_count - 1 - i] - .merge - .estimated_merge_bytes - .read() as f64 - > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 - { - big_merge_count = tasks_count - i; - break; - } + let mut active_tasks: Vec<_> = self.merge_tasks.iter().collect(); + active_tasks.sort(); + + let tasks_count = active_tasks.len(); + + let mut big_merge_count = 0; + for i in 0..tasks_count { + if active_tasks[tasks_count - 1 - i] + .merge + .estimated_merge_bytes + .read() as f64 + > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 + { + big_merge_count = tasks_count - i; + break; } + } - for (idx, task) in active_tasks.iter().enumerate() { - // pause the thread if max_thread_count is smaller than the number of merge threads. - let do_pause = idx + self.max_thread_count < big_merge_count; - unsafe - - let new_mb_per_sec = if do_pause { - 0.0 - } else if task.merge.max_num_segments.get().is_some() { - self.force_merge_mb_per_sec - } else if !self.do_auto_io_throttle.get() - || ((task.merge.estimated_merge_bytes.read() as f64) - < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) - { - f64::INFINITY - } else { - self.target_mb_per_sec.get() - }; + for (idx, task) in active_tasks.iter().enumerate() { + // pause the thread if max_thread_count is smaller than the number of merge threads. + let do_pause = idx + self.max_thread_count < big_merge_count; + + let new_mb_per_sec = if do_pause { + 0.0 + } else if task.merge.max_num_segments.get().is_some() { + self.force_merge_mb_per_sec + } else if !self.do_auto_io_throttle + || ((task.merge.estimated_merge_bytes.read() as f64) + < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) + { + f64::INFINITY + } else { + self.target_mb_per_sec + }; - task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); - } + task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); } } fn merge_thread_count(&self) -> usize { let current_thread = thread::current().id(); - unsafe { - self.merge_tasks - .get_mut() - .iter() - .filter(|t| { - t.thread_id != current_thread - && t.thread_alive() - && !t.merge.rate_limiter.aborted() - }) - .count() - } + self.merge_tasks + .iter() + .filter(|t| { + t.thread_id != current_thread && t.thread_alive() && !t.merge.rate_limiter.aborted() + }) + .count() } fn update_io_throttle( @@ -395,7 +389,6 @@ impl ConcurrentMergeSchedulerInner { } } } - false } } @@ -412,62 +405,63 @@ impl MergeScheduler for ConcurrentMergeScheduler { C: Codec, MP: MergePolicy, { - let mut guard = self.inner.lock.lock().unwrap(); - let t = - guard.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; - let scheduler = unsafe { self.inner.scheduler_mut((&guard).into()) }; - - if trigger == MergerTrigger::Closing { - // Disable throttling on close: - scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC; - scheduler.update_merge_threads(); - } - - // First, quickly run through the newly proposed merges - // and add any orthogonal merges (ie a merge not - // involving segments already pending to be merged) to - // the queue. If we are way behind on merging, many of - // these newly proposed merges will likely already be - // registered. + unsafe { + let mut guard = self.inner.lock().unwrap(); + let lock = self.inner.lock().unwrap(); + let scheduler = lock.scheduler_mut(&guard); - loop { - let (valid, g) = scheduler.maybe_stall(writer, guard); - guard = g; - if !valid { - break; + if trigger == MergerTrigger::Closing { + // Disable throttling on close: + scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC; + scheduler.update_merge_threads(); } - if let Some(merge) = writer.next_merge() { - scheduler.update_io_throttle(&merge); - - let sentinel = Arc::new(ThreadSentinel); - let live_sentinel = Arc::downgrade(&sentinel); - let merge_thread = MergeThread { - index_writer: writer.clone(), - merge_scheduler: self.clone(), - _live_sentinel: sentinel, - }; - let merge_info = merge.schedule_info(); - let handler = thread::Builder::new() - .name(format!( - "Rucene Merge Thread #{}", - scheduler.merge_thread_count - )) - .spawn(move || { - merge_thread.merge(merge); - }) - .expect("failed to spawn thread"); - scheduler.merge_thread_count += 1; - - let merge_task = MergeTaskInfo { - merge: merge_info, - thread_id: handler.thread().id(), - live_sentinel, - }; - scheduler.merge_tasks.push(merge_task); - scheduler.update_merge_threads(); - } else { - return Ok(()); + // First, quickly run through the newly proposed merges + // and add any orthogonal merges (ie a merge not + // involving segments already pending to be merged) to + // the queue. If we are way behind on merging, many of + // these newly proposed merges will likely already be + // registered. + + loop { + let (valid, g) = scheduler.maybe_stall(writer, guard); + guard = g; + if !valid { + break; + } + + if let Some(merge) = writer.next_merge() { + scheduler.update_io_throttle(&merge); + + let sentinel = Arc::new(ThreadSentinel); + let live_sentinel = Arc::downgrade(&sentinel); + let merge_thread = MergeThread { + index_writer: writer.clone(), + merge_scheduler: self.clone(), + _live_sentinel: sentinel, + }; + let merge_info = merge.schedule_info(); + let handler = thread::Builder::new() + .name(format!( + "Rucene Merge Thread #{}", + scheduler.merge_thread_count + )) + .spawn(move || { + merge_thread.merge(merge); + }) + .expect("failed to spawn thread"); + scheduler.merge_thread_count += 1; + + let merge_task = MergeTaskInfo { + merge: merge_info, + thread_id: handler.thread().id(), + live_sentinel, + }; + scheduler.merge_tasks.push(merge_task); + scheduler.update_merge_threads(); + } else { + return Ok(()); + } } } Ok(()) @@ -481,7 +475,7 @@ impl MergeScheduler for ConcurrentMergeScheduler { } fn merging_thread_count(&self) -> Option { - Some(self.inner.merge_thread_count()) + Some(self.inner.lock().unwrap().merge_thread_count()) } } @@ -510,9 +504,8 @@ impl MergeThrea } Ok(()) => {} } - let mut l = self.merge_scheduler.inner.lock.lock().unwrap(); - let t = l.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; - let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut((&l).into()) }; + let l = self.merge_scheduler.inner.lock().unwrap(); + let scheduler_mut = unsafe { l.scheduler_mut(&l) }; scheduler_mut .merge_tasks .retain(|t| t.merge.id != one_merge.id); From 1c999ec09bd7bc4ba66a5180ff5cc012fdf4ac82 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Thu, 8 Feb 2024 19:42:52 +0000 Subject: [PATCH 6/8] Fixing flush control Signed-off-by: Harsha Vamsi Kalluri --- src/core/index/writer/flush_control.rs | 15 +++++++++++++-- src/core/index/writer/index_writer.rs | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/core/index/writer/flush_control.rs b/src/core/index/writer/flush_control.rs index 599bae4..5f109b4 100644 --- a/src/core/index/writer/flush_control.rs +++ b/src/core/index/writer/flush_control.rs @@ -19,6 +19,7 @@ use core::index::writer::{ }; use core::util::external::Volatile; use error::Result; +use std::cell::UnsafeCell; use core::store::directory::Directory; use std::collections::{HashMap, VecDeque}; @@ -101,14 +102,24 @@ impl>, + ) -> &mut DocumentsWriterFlushControl { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] unsafe fn flush_control_mut( &self, _l: &MutexGuard, ) -> &mut DocumentsWriterFlushControl { let control = self as *const DocumentsWriterFlushControl - as *mut DocumentsWriterFlushControl; - &mut *control + as *mut DocumentsWriterFlushControl + as *const UnsafeCell>; + unsafe { + let s = DocumentsWriterFlushControl::get_self(control.as_ref().unwrap()); + &mut *s + } } pub fn do_after_document( diff --git a/src/core/index/writer/index_writer.rs b/src/core/index/writer/index_writer.rs index 76f815f..dca2354 100644 --- a/src/core/index/writer/index_writer.rs +++ b/src/core/index/writer/index_writer.rs @@ -43,6 +43,7 @@ use core::util::to_base36; use core::util::{BitsRef, DerefWrapper, DocId, VERSION_LATEST}; use core::index::ErrorKind::MergeAborted; +use std::cell::UnsafeCell; use error::ErrorKind::{AlreadyClosed, IllegalArgument, IllegalState, Index, RuntimeError}; use error::{Error, Result}; @@ -1061,11 +1062,20 @@ where }) } + unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut IndexWriterInner { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] unsafe fn writer_mut(&self, _l: &MutexGuard<()>) -> &mut IndexWriterInner { let writer = - self as *const IndexWriterInner as *mut IndexWriterInner; - &mut *writer + self as *const IndexWriterInner as *mut IndexWriterInner as *const UnsafeCell>; + unsafe { + let s = IndexWriterInner::get_self(writer.as_ref().unwrap()); + &mut *s + } } fn get_reader( From d6769df09685ddf95eeceda13a19a0051520a758 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Thu, 8 Feb 2024 20:07:28 +0000 Subject: [PATCH 7/8] Fixing for_util Signed-off-by: Harsha Vamsi Kalluri --- src/core/codec/postings/for_util.rs | 36 +++++++++++-------- src/core/index/writer/bufferd_updates.rs | 12 +++++-- src/core/index/writer/doc_writer.rs | 14 ++++++-- .../index/writer/doc_writer_per_thread.rs | 15 ++++++-- 4 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/core/codec/postings/for_util.rs b/src/core/codec/postings/for_util.rs index 866fa4a..f57d1e7 100644 --- a/src/core/codec/postings/for_util.rs +++ b/src/core/codec/postings/for_util.rs @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::UnsafeCell; use std::cmp::max; use std::sync::{Arc, Once}; @@ -72,10 +73,7 @@ pub fn max_data_size() -> usize { let iterations = compute_iterations(&decoder) as usize; max_data_size = max(max_data_size, iterations * decoder.byte_value_count()); } else { - panic!( - "get_decoder({:?},{:?},{:?}) failed.", - format, version, bpv - ); + panic!("get_decoder({:?},{:?},{:?}) failed.", format, version, bpv); } } let format = Format::PackedSingleBlock; @@ -84,10 +82,7 @@ pub fn max_data_size() -> usize { let iterations = compute_iterations(&decoder) as usize; max_data_size = max(max_data_size, iterations * decoder.byte_value_count()); } else { - panic!( - "get_decoder({:?},{:?},{:?}) failed.", - format, version, bpv - ); + panic!("get_decoder({:?},{:?},{:?}) failed.", format, version, bpv); } } } @@ -132,8 +127,10 @@ impl ForUtilInstance { let format = Format::with_id(format_id); encoded_sizes[bpv] = encoded_size(format, packed_ints_version, bits_per_value); unsafe { - decoders.assume_init_mut()[bpv] = get_decoder(format, packed_ints_version, bits_per_value)?; - encoders.assume_init_mut()[bpv] = get_encoder(format, packed_ints_version, bits_per_value)?; + decoders.assume_init_mut()[bpv] = + get_decoder(format, packed_ints_version, bits_per_value)?; + encoders.assume_init_mut()[bpv] = + get_encoder(format, packed_ints_version, bits_per_value)?; iterations[bpv] = compute_iterations(&decoders.assume_init_ref()[bpv]); } } @@ -168,8 +165,10 @@ impl ForUtilInstance { debug_assert!(bits_per_value <= 32); encoded_sizes[bpv - 1] = encoded_size(format, VERSION_CURRENT, bits_per_value); unsafe { - decoders.assume_init_mut()[bpv - 1] = get_decoder(format, VERSION_CURRENT, bits_per_value)?; - encoders.assume_init_mut()[bpv - 1] = get_encoder(format, VERSION_CURRENT, bits_per_value)?; + decoders.assume_init_mut()[bpv - 1] = + get_decoder(format, VERSION_CURRENT, bits_per_value)?; + encoders.assume_init_mut()[bpv - 1] = + get_encoder(format, VERSION_CURRENT, bits_per_value)?; iterations[bpv - 1] = compute_iterations(&decoders.assume_init_ref()[bpv - 1]); } @@ -334,6 +333,12 @@ impl ForUtil { self.instance.read_block_by_simd(input, decoder) } + unsafe fn get_self( + ptr: &UnsafeCell + ) -> &mut EliasFanoEncoder { + unsafe { &mut *ptr.get() } + } + pub fn read_other_encode_block( doc_in: &mut dyn IndexInput, ef_decoder: &mut Option, @@ -346,9 +351,12 @@ impl ForUtil { let upper_bound = doc_in.read_vlong()?; if ef_decoder.is_some() { let encoder = unsafe { - &mut *(ef_decoder.as_mut().unwrap().get_encoder().as_ref() + let s = + ef_decoder.as_mut().unwrap().get_encoder().as_ref() as *const EliasFanoEncoder - as *mut EliasFanoEncoder) + as *mut EliasFanoEncoder as *const UnsafeCell; + let t = ForUtil::get_self(s.as_ref().unwrap()); + &mut *t }; encoder.rebuild_not_with_check(BLOCK_SIZE as i64, upper_bound)?; encoder.deserialize2(doc_in)?; diff --git a/src/core/index/writer/bufferd_updates.rs b/src/core/index/writer/bufferd_updates.rs index 67a2fc6..c240ee6 100644 --- a/src/core/index/writer/bufferd_updates.rs +++ b/src/core/index/writer/bufferd_updates.rs @@ -30,6 +30,7 @@ use core::store::directory::Directory; use core::store::IOContext; use core::util::DocId; +use std::cell::UnsafeCell; use std::cmp::{min, Ordering as CmpOrdering}; use std::collections::{BinaryHeap, HashMap}; use std::fmt; @@ -362,6 +363,12 @@ impl BufferedUpdatesStream { self.num_terms.load(Ordering::Acquire) } + unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut BufferedUpdatesStream { + unsafe { &mut *ptr.get() } + } + pub fn apply_deletes_and_updates( &self, pool: &ReaderPool, @@ -374,8 +381,9 @@ impl BufferedUpdatesStream { { let _l = self.lock.lock().unwrap(); let updates_stream = unsafe { - let stream = self as *const BufferedUpdatesStream as *mut BufferedUpdatesStream; - &mut *stream + let stream = self as *const BufferedUpdatesStream as *mut BufferedUpdatesStream as *const UnsafeCell>; + let s = BufferedUpdatesStream::get_self(stream.as_ref().unwrap()); + &mut *s }; let mut seg_states = Vec::with_capacity(infos.len()); let gen = self.next_gen.load(Ordering::Acquire); diff --git a/src/core/index/writer/doc_writer.rs b/src/core/index/writer/doc_writer.rs index 9dfe250..7591ee6 100644 --- a/src/core/index/writer/doc_writer.rs +++ b/src/core/index/writer/doc_writer.rs @@ -23,6 +23,7 @@ use core::index::writer::{ use core::search::query::Query; use core::store::directory::{Directory, LockValidatingDirectoryWrapper}; use core::util::external::Volatile; +use std::cell::UnsafeCell; use error::{ErrorKind::AlreadyClosed, ErrorKind::IllegalState, Result}; use crossbeam::queue::SegQueue; @@ -181,10 +182,19 @@ where self.index_writer.upgrade().unwrap() } + unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut DocumentsWriter { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] unsafe fn doc_writer_mut(&self, _l: &MutexGuard<()>) -> &mut DocumentsWriter { - let w = self as *const DocumentsWriter as *mut DocumentsWriter; - &mut *w + let w = self as *const DocumentsWriter as *mut DocumentsWriter as *const UnsafeCell>; + unsafe { + let s = DocumentsWriter::get_self(w.as_ref().unwrap()); + &mut *s + } } pub fn set_delete_queue(&self, delete_queue: Arc>) { diff --git a/src/core/index/writer/doc_writer_per_thread.rs b/src/core/index/writer/doc_writer_per_thread.rs index 3820a5b..bd9bb82 100644 --- a/src/core/index/writer/doc_writer_per_thread.rs +++ b/src/core/index/writer/doc_writer_per_thread.rs @@ -29,7 +29,7 @@ use core::{ }, }; -use std::collections::{HashMap, HashSet}; +use std::{cell::UnsafeCell, collections::{HashMap, HashSet}}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockWriteGuard, Weak}; use std::time::SystemTime; @@ -814,13 +814,22 @@ where } } + unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut ThreadState { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] pub fn thread_state_mut( &self, _lock: &MutexGuard, ) -> &mut ThreadState { - let state = self as *const ThreadState as *mut ThreadState; - unsafe { &mut *state } + let state = self as *const ThreadState as *mut ThreadState as *const UnsafeCell>; + unsafe { + let s = ThreadState::get_self(state.as_ref().unwrap()); + &mut *s + } } pub fn dwpt(&self) -> &DocumentsWriterPerThread { From 3f7801b350d16c586d441d9dd3e08a2a16ce4fa7 Mon Sep 17 00:00:00 2001 From: Harsha Vamsi Kalluri Date: Fri, 9 Feb 2024 00:20:25 +0000 Subject: [PATCH 8/8] Fix remaining instances of unsafe Signed-off-by: Harsha Vamsi Kalluri --- src/core/codec/field_infos/mod.rs | 15 ++++++++---- src/core/codec/segment_infos/mod.rs | 7 ++++++ src/core/index/writer/index_writer.rs | 34 +++++++++++++++++++++------ src/core/util/doc_id_set.rs | 11 +++++++-- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/src/core/codec/field_infos/mod.rs b/src/core/codec/field_infos/mod.rs index a5ac2d8..e896409 100644 --- a/src/core/codec/field_infos/mod.rs +++ b/src/core/codec/field_infos/mod.rs @@ -18,6 +18,7 @@ pub use self::field_infos_format::*; use error::ErrorKind::{IllegalArgument, IllegalState}; use error::Result; +use std::cell::UnsafeCell; use std::cmp::max; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; @@ -108,6 +109,12 @@ impl FieldInfo { Ok(info) } + pub unsafe fn get_self( + ptr: &UnsafeCell, + ) -> &mut FieldInfo { + unsafe { &mut *ptr.get() } + } + pub fn check_consistency(&self) -> Result<()> { if let IndexOptions::Null = self.index_options { if self.has_store_term_vector { @@ -324,10 +331,10 @@ pub struct FieldInvertState { // we must track these across field instances (multi-valued case) pub last_start_offset: i32, pub last_position: i32, - /* pub offset_attribute: OffsetAttribute, - * pub pos_incr_attribute: PositionIncrementAttribute, - * pub payload_attribute: PayloadAttribute, - * term_attribute: TermToBytesRefAttribute, */ + // pub offset_attribute: OffsetAttribute, + // pub pos_incr_attribute: PositionIncrementAttribute, + // pub payload_attribute: PayloadAttribute, + // term_attribute: TermToBytesRefAttribute, } impl FieldInvertState { diff --git a/src/core/codec/segment_infos/mod.rs b/src/core/codec/segment_infos/mod.rs index 2803616..aab2965 100644 --- a/src/core/codec/segment_infos/mod.rs +++ b/src/core/codec/segment_infos/mod.rs @@ -21,6 +21,7 @@ pub use self::segment_infos_format::*; use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; +use std::cell::UnsafeCell; use std::collections::{HashMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; @@ -424,6 +425,12 @@ impl SegmentCommitInfo { } } + pub unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut SegmentCommitInfo { + unsafe { &mut *ptr.get() } + } + pub fn files(&self) -> HashSet { let mut files = HashSet::new(); // Start from the wrapped info's files: diff --git a/src/core/index/writer/index_writer.rs b/src/core/index/writer/index_writer.rs index dca2354..e16c095 100644 --- a/src/core/index/writer/index_writer.rs +++ b/src/core/index/writer/index_writer.rs @@ -43,9 +43,9 @@ use core::util::to_base36; use core::util::{BitsRef, DerefWrapper, DocId, VERSION_LATEST}; use core::index::ErrorKind::MergeAborted; -use std::cell::UnsafeCell; use error::ErrorKind::{AlreadyClosed, IllegalArgument, IllegalState, Index, RuntimeError}; use error::{Error, Result}; +use std::cell::UnsafeCell; use std::collections::{HashMap, HashSet, VecDeque}; use std::mem; @@ -1070,8 +1070,9 @@ where #[allow(clippy::mut_from_ref)] unsafe fn writer_mut(&self, _l: &MutexGuard<()>) -> &mut IndexWriterInner { - let writer = - self as *const IndexWriterInner as *mut IndexWriterInner as *const UnsafeCell>; + let writer = self as *const IndexWriterInner + as *mut IndexWriterInner + as *const UnsafeCell>; unsafe { let s = IndexWriterInner::get_self(writer.as_ref().unwrap()); &mut *s @@ -4445,6 +4446,13 @@ where self.pending_dv_updates.insert(field, v); } } + + unsafe fn get_self( + ptr: &UnsafeCell>, + ) -> &mut ReadersAndUpdatesInner { + unsafe { &mut *ptr.get() } + } + // Writes field updates (new _X_N updates files) to the directory // pub fn write_field_updates(&mut self, _dir: &DW) -> Result<()> { pub fn write_field_updates(&self) -> Result { @@ -4454,8 +4462,12 @@ where } let me = unsafe { - &mut *(self as *const ReadersAndUpdatesInner - as *mut ReadersAndUpdatesInner) + let t = self as *const ReadersAndUpdatesInner + as *mut ReadersAndUpdatesInner + as *const UnsafeCell>; + let s = ReadersAndUpdatesInner::get_self(t.as_ref().unwrap()); + + &mut *s }; assert!(self.reader.is_some()); @@ -4482,7 +4494,10 @@ where let mut new_dv_files = me.handle_doc_values_updates(&mut field_infos, doc_values_format)?; let info_mut_ref = unsafe { - &mut *(info.as_ref() as *const SegmentCommitInfo as *mut SegmentCommitInfo) + let t = info.as_ref() as *const SegmentCommitInfo as *mut SegmentCommitInfo + as *const UnsafeCell>; + let s = SegmentCommitInfo::get_self(t.as_ref().unwrap()); + &mut *s }; // writeFieldInfosGen fnm if !new_dv_files.is_empty() { @@ -4540,7 +4555,12 @@ where // step1 construct segment write state let ctx = IOContext::Flush(FlushInfo::new(info.info.max_doc() as u32)); let field_info = infos.field_info_by_name(field).unwrap(); - let field_info = unsafe { &mut *(field_info as *const FieldInfo as *mut FieldInfo) }; + let field_info = unsafe { + let t = field_info as *const FieldInfo as *mut FieldInfo + as *const UnsafeCell; + let s: &mut FieldInfo = FieldInfo::get_self(t.as_ref().unwrap()); + &mut *s + }; let old_dv_gen = field_info.set_doc_values_gen(dv_gen); let state = SegmentWriteState::new( tracker.clone(), diff --git a/src/core/util/doc_id_set.rs b/src/core/util/doc_id_set.rs index 95c44aa..e8d5f59 100644 --- a/src/core/util/doc_id_set.rs +++ b/src/core/util/doc_id_set.rs @@ -17,6 +17,7 @@ use core::search::{DocIdSet, DocIterator, NO_MORE_DOCS}; use core::util::bit_set::{FixedBitSet, ImmutableBitSet}; use core::util::packed::{EliasFanoDecoder, EliasFanoEncoder, NO_MORE_VALUES}; use core::util::DocId; +use std::cell::UnsafeCell; use error::ErrorKind::*; use std::borrow::Cow; use std::sync::Arc; @@ -361,7 +362,7 @@ impl DocIterator for NotDocIterator { #[derive(Debug)] pub struct EliasFanoDocIdSet { - ef_encoder:Arc, + ef_encoder: Arc, } impl EliasFanoDocIdSet { @@ -379,9 +380,15 @@ impl EliasFanoDocIdSet { EliasFanoEncoder::sufficiently_smaller_than_bit_set(num_values, upper_bound) } + unsafe fn get_self(ptr: &UnsafeCell) -> &mut EliasFanoEncoder { + unsafe { &mut *ptr.get() } + } + pub fn encode_from_disi(&mut self, mut disi: impl DocIterator) -> Result<()> { let encoder = unsafe { - &mut *(self.ef_encoder.as_ref() as *const EliasFanoEncoder as *mut EliasFanoEncoder) + let s = self.ef_encoder.as_ref() as *const EliasFanoEncoder as *mut EliasFanoEncoder as *const UnsafeCell; + let t = EliasFanoDocIdSet::get_self(s.as_ref().unwrap()); + &mut *t }; while self.ef_encoder.num_encoded < self.ef_encoder.num_values { let x = disi.next()?;