From 60f21c37c54eafc51fe84a6e00660e5e516cd7fe Mon Sep 17 00:00:00 2001 From: tongjianlin Date: Mon, 7 Sep 2020 11:54:10 +0800 Subject: [PATCH] fix memleak by threadlocal clear --- .../codec/doc_values/doc_values_format.rs | 53 ++++++++----------- src/core/codec/field_infos/mod.rs | 10 ++-- .../postings/blocktree/blocktree_reader.rs | 10 ++-- .../postings/per_field_postings_format.rs | 31 ++++------- src/core/index/reader/segment_reader.rs | 18 +++++-- 5 files changed, 54 insertions(+), 68 deletions(-) diff --git a/src/core/codec/doc_values/doc_values_format.rs b/src/core/codec/doc_values/doc_values_format.rs index acdfea7..417c254 100644 --- a/src/core/codec/doc_values/doc_values_format.rs +++ b/src/core/codec/doc_values/doc_values_format.rs @@ -17,8 +17,8 @@ use core::codec::doc_values::{ SortedSetDocValues, }; use core::codec::doc_values::{ - BinaryDocValuesProvider, DocValuesProducer, DocValuesProducerRef, NumericDocValuesProvider, - SortedDocValuesProvider, SortedNumericDocValuesProvider, SortedSetDocValuesProvider, + BinaryDocValuesProvider, DocValuesProducer, NumericDocValuesProvider, SortedDocValuesProvider, + SortedNumericDocValuesProvider, SortedSetDocValuesProvider, }; use core::codec::field_infos::FieldInfo; use core::codec::segment_infos::{SegmentReadState, SegmentWriteState}; @@ -30,7 +30,6 @@ use core::util::{BitsMut, BytesRef, Numeric, ReusableIterator}; use error::ErrorKind::{IllegalArgument, IllegalState}; use error::Result; -use std::collections::hash_map::Entry as HashMapEntry; use std::collections::{BTreeMap, HashMap}; use std::mem; use std::sync::Arc; @@ -310,7 +309,7 @@ pub fn doc_values_format_for_name(format: &str) -> Result { } pub struct DocValuesFieldsReader { - fields: BTreeMap, + fields: BTreeMap>, } impl DocValuesFieldsReader { @@ -321,40 +320,34 @@ impl DocValuesFieldsReader { C: Codec, { let mut fields = BTreeMap::new(); - let mut formats = HashMap::new(); + let mut formats: HashMap> = HashMap::new(); for (name, info) in &state.field_infos.by_name { if info.doc_values_type == DocValuesType::Null { continue; } let attrs = info.attributes.read().unwrap(); if let Some(format) = attrs.get(PER_FIELD_VALUE_FORMAT_KEY) { - let suffix = attrs.get(PER_FIELD_VALUE_SUFFIX_KEY); - match suffix { - None => bail!(IllegalState(format!( - "Missing attribute {} for field: {}", - PER_FIELD_VALUE_SUFFIX_KEY, name - ))), - Some(suffix) => { + if let Some(suffix) = attrs.get(PER_FIELD_VALUE_SUFFIX_KEY) { + let segment_suffix = + get_full_segment_suffix(&state.segment_suffix, get_suffix(format, suffix)); + + if !formats.contains_key(&segment_suffix) { let dv_format = doc_values_format_for_name(format)?; - let segment_suffix = get_full_segment_suffix( - &state.segment_suffix, - get_suffix(format, suffix), - ); - - match formats.entry(segment_suffix.clone()) { - HashMapEntry::Occupied(occupied) => { - fields.insert(name.to_string(), Arc::clone(occupied.get())); - } - HashMapEntry::Vacant(vacant) => { - let segment_read_state = - SegmentReadState::with_suffix(state, &segment_suffix); - let dv_producer = dv_format.fields_producer(&segment_read_state)?; - let dv_producer = Arc::from(dv_producer); - vacant.insert(Arc::clone(&dv_producer)); - fields.insert(name.to_string(), dv_producer); - } - } + let segment_read_state = + SegmentReadState::with_suffix(state, &segment_suffix); + let dv_producer = dv_format.fields_producer(&segment_read_state)?; + let dv_producer = Arc::from(dv_producer); + formats.insert(segment_suffix.clone(), Arc::clone(&dv_producer)); + } + + if let Some(dv_producer) = formats.get(&segment_suffix) { + fields.insert(name.clone(), Arc::clone(dv_producer)); } + } else { + bail!(IllegalState(format!( + "Missing attribute {} for field: {}", + PER_FIELD_VALUE_SUFFIX_KEY, name + ))); } } } diff --git a/src/core/codec/field_infos/mod.rs b/src/core/codec/field_infos/mod.rs index 188e0f1..a5ac2d8 100644 --- a/src/core/codec/field_infos/mod.rs +++ b/src/core/codec/field_infos/mod.rs @@ -307,8 +307,6 @@ impl fmt::Display for FieldInfo { } } -pub type FieldInfoRef = Arc; - /// This class tracks the number and position / offset parameters of terms /// being added to the index. The information collected in this class is /// also used to calculate the normalization factor for a field. @@ -384,8 +382,8 @@ pub struct FieldInfos { pub has_doc_values: bool, pub has_point_values: bool, - pub by_number: BTreeMap, - pub by_name: HashMap, + pub by_number: BTreeMap>, + pub by_name: HashMap>, } impl Serialize for FieldInfos { @@ -434,8 +432,8 @@ impl FieldInfos { let mut has_doc_values = false; let mut has_point_values = false; - let mut by_number: BTreeMap = BTreeMap::new(); - let mut by_name: HashMap = HashMap::new(); + let mut by_number: BTreeMap> = BTreeMap::new(); + let mut by_name: HashMap> = HashMap::new(); let mut max_number = 0; let mut infos = infos; diff --git a/src/core/codec/postings/blocktree/blocktree_reader.rs b/src/core/codec/postings/blocktree/blocktree_reader.rs index 3beda40..3ed1e6e 100644 --- a/src/core/codec/postings/blocktree/blocktree_reader.rs +++ b/src/core/codec/postings/blocktree/blocktree_reader.rs @@ -21,7 +21,7 @@ use std::ops::DerefMut; use std::string::ToString; use std::sync::Arc; -use core::codec::field_infos::{FieldInfo, FieldInfoRef}; +use core::codec::field_infos::FieldInfo; use core::codec::postings::blocktree::{BlockTermState, SegmentTermsIterFrame, MAX_LONGS_SIZE}; use core::codec::postings::{ FieldsProducer, Lucene50PostingIterator, Lucene50PostingsReader, Lucene50PostingsReaderRef, @@ -389,7 +389,7 @@ type FSTRef = Arc>; /// BlockTree's implementation of `Terms` pub struct FieldReader { num_terms: i64, - field_info: FieldInfoRef, + field_info: Arc, sum_total_term_freq: i64, sum_doc_freq: i64, doc_count: i32, @@ -411,7 +411,7 @@ impl FieldReader { #[allow(clippy::too_many_arguments)] pub fn new( parent: BlockTreeTermsReader, - field_info: FieldInfoRef, + field_info: Arc, num_terms: i64, root_code: Vec, sum_total_term_freq: i64, @@ -913,7 +913,7 @@ impl SegmentTermIterator { field_reader: &FieldReader, terms_in: IndexInputRef, postings_reader: Lucene50PostingsReaderRef, - field_info: FieldInfoRef, + field_info: Arc, ) -> Self { let iter = Box::new(SegmentTermIteratorInner::new( field_reader, @@ -1028,7 +1028,7 @@ impl SegmentTermIteratorInner { field_reader: &FieldReader, terms_in: IndexInputRef, postings_reader: Lucene50PostingsReaderRef, - field_info: FieldInfoRef, + field_info: Arc, ) -> Self { let mut arcs = vec![]; if let Some(ref index) = field_reader.index { diff --git a/src/core/codec/postings/per_field_postings_format.rs b/src/core/codec/postings/per_field_postings_format.rs index ea4b732..8d56971 100644 --- a/src/core/codec/postings/per_field_postings_format.rs +++ b/src/core/codec/postings/per_field_postings_format.rs @@ -87,7 +87,6 @@ impl PostingsFormat for PerFieldPostingsFormat { } pub struct PerFieldFieldsReader { - _formats: HashMap>, fields: BTreeMap>, segment: String, } @@ -97,35 +96,27 @@ impl PerFieldFieldsReader { state: &SegmentReadState<'_, D, DW, C>, ) -> Result { let mut fields = BTreeMap::new(); - let mut _formats = HashMap::new(); + let mut formats = HashMap::new(); for (name, info) in &state.field_infos.by_name { if let IndexOptions::Null = info.index_options { continue; } - if let Some(format) = info - .attributes - .read() - .unwrap() - .get(PER_FIELD_POSTING_FORMAT_KEY) - { - if let Some(suffix) = info - .attributes - .read() - .unwrap() - .get(PER_FIELD_POSTING_SUFFIX_KEY) - { + + let attrs = info.attributes.read().unwrap(); + if let Some(format) = attrs.get(PER_FIELD_POSTING_FORMAT_KEY) { + if let Some(suffix) = attrs.get(PER_FIELD_POSTING_SUFFIX_KEY) { let segment_suffix = get_suffix(&format, suffix); - if !_formats.contains_key(&segment_suffix) { + if !formats.contains_key(&segment_suffix) { let postings_format = postings_format_for_name(format)?; let state = SegmentReadState::with_suffix(state, &segment_suffix); - _formats.insert( + formats.insert( segment_suffix.clone(), Arc::new(postings_format.fields_producer(&state)?), ); } - if let Some(field_producer) = _formats.get(&segment_suffix) { + if let Some(field_producer) = formats.get(&segment_suffix) { fields.insert(name.clone(), field_producer.clone()); } } else { @@ -138,11 +129,7 @@ impl PerFieldFieldsReader { } } let segment = state.segment_info.name.clone(); - Ok(PerFieldFieldsReader { - _formats, - fields, - segment, - }) + Ok(PerFieldFieldsReader { fields, segment }) } fn terms_impl(&self, field: &str) -> Result> { diff --git a/src/core/index/reader/segment_reader.rs b/src/core/index/reader/segment_reader.rs index 42689af..b009648 100644 --- a/src/core/index/reader/segment_reader.rs +++ b/src/core/index/reader/segment_reader.rs @@ -489,6 +489,15 @@ pub struct SegmentReader { unsafe impl Sync for SegmentReader {} +impl Drop for SegmentReader { + fn drop(&mut self) { + self.doc_values_producer_preload.write().unwrap().clear(); + self.doc_values_local_preload.write().unwrap().clear(); + self.doc_values_producer.clear(); + self.doc_values_local.clear(); + } +} + /// IndexReader implementation over a single segment. /// Instances pointing to the same segment (but with different deletes, etc) /// may share the same core data. @@ -502,11 +511,7 @@ impl SegmentReader { is_nrt: bool, field_infos: Arc, ) -> SegmentReader { - let doc_values_local = CachedThreadLocal::new(); - doc_values_local.get_or(|| Box::new(RefCell::new(HashMap::new()))); - - let max_preload_num = 5 * num_cpus::get_physical(); - + let max_preload_num = num_cpus::get_physical(); let mut doc_values_producer_preload: Vec> = Vec::with_capacity(max_preload_num); for _ in 0..max_preload_num { @@ -583,6 +588,9 @@ impl SegmentReader { doc_values_producer.get_or(|| Box::new(dv_producer)); } + let doc_values_local = CachedThreadLocal::new(); + doc_values_local.get_or(|| Box::new(RefCell::new(HashMap::new()))); + SegmentReader { si, live_docs,