Skip to content

Commit

Permalink
fix memleak by threadlocal clear
Browse files Browse the repository at this point in the history
  • Loading branch information
jtong11 committed Oct 9, 2020
1 parent 23b074e commit 60f21c3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 68 deletions.
53 changes: 23 additions & 30 deletions src/core/codec/doc_values/doc_values_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use core::codec::doc_values::{
SortedSetDocValues,
};
use core::codec::doc_values::{
BinaryDocValuesProvider, DocValuesProducer, DocValuesProducerRef, NumericDocValuesProvider,
SortedDocValuesProvider, SortedNumericDocValuesProvider, SortedSetDocValuesProvider,
BinaryDocValuesProvider, DocValuesProducer, NumericDocValuesProvider, SortedDocValuesProvider,
SortedNumericDocValuesProvider, SortedSetDocValuesProvider,
};
use core::codec::field_infos::FieldInfo;
use core::codec::segment_infos::{SegmentReadState, SegmentWriteState};
Expand All @@ -30,7 +30,6 @@ use core::util::{BitsMut, BytesRef, Numeric, ReusableIterator};

use error::ErrorKind::{IllegalArgument, IllegalState};
use error::Result;
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::sync::Arc;
Expand Down Expand Up @@ -310,7 +309,7 @@ pub fn doc_values_format_for_name(format: &str) -> Result<DocValuesFormatEnum> {
}

pub struct DocValuesFieldsReader {
fields: BTreeMap<String, DocValuesProducerRef>,
fields: BTreeMap<String, Arc<dyn DocValuesProducer>>,
}

impl DocValuesFieldsReader {
Expand All @@ -321,40 +320,34 @@ impl DocValuesFieldsReader {
C: Codec,
{
let mut fields = BTreeMap::new();
let mut formats = HashMap::new();
let mut formats: HashMap<String, Arc<dyn DocValuesProducer>> = HashMap::new();
for (name, info) in &state.field_infos.by_name {
if info.doc_values_type == DocValuesType::Null {
continue;
}
let attrs = info.attributes.read().unwrap();
if let Some(format) = attrs.get(PER_FIELD_VALUE_FORMAT_KEY) {
let suffix = attrs.get(PER_FIELD_VALUE_SUFFIX_KEY);
match suffix {
None => bail!(IllegalState(format!(
"Missing attribute {} for field: {}",
PER_FIELD_VALUE_SUFFIX_KEY, name
))),
Some(suffix) => {
if let Some(suffix) = attrs.get(PER_FIELD_VALUE_SUFFIX_KEY) {
let segment_suffix =
get_full_segment_suffix(&state.segment_suffix, get_suffix(format, suffix));

if !formats.contains_key(&segment_suffix) {
let dv_format = doc_values_format_for_name(format)?;
let segment_suffix = get_full_segment_suffix(
&state.segment_suffix,
get_suffix(format, suffix),
);

match formats.entry(segment_suffix.clone()) {
HashMapEntry::Occupied(occupied) => {
fields.insert(name.to_string(), Arc::clone(occupied.get()));
}
HashMapEntry::Vacant(vacant) => {
let segment_read_state =
SegmentReadState::with_suffix(state, &segment_suffix);
let dv_producer = dv_format.fields_producer(&segment_read_state)?;
let dv_producer = Arc::from(dv_producer);
vacant.insert(Arc::clone(&dv_producer));
fields.insert(name.to_string(), dv_producer);
}
}
let segment_read_state =
SegmentReadState::with_suffix(state, &segment_suffix);
let dv_producer = dv_format.fields_producer(&segment_read_state)?;
let dv_producer = Arc::from(dv_producer);
formats.insert(segment_suffix.clone(), Arc::clone(&dv_producer));
}

if let Some(dv_producer) = formats.get(&segment_suffix) {
fields.insert(name.clone(), Arc::clone(dv_producer));
}
} else {
bail!(IllegalState(format!(
"Missing attribute {} for field: {}",
PER_FIELD_VALUE_SUFFIX_KEY, name
)));
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/core/codec/field_infos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ impl fmt::Display for FieldInfo {
}
}

pub type FieldInfoRef = Arc<FieldInfo>;

/// This class tracks the number and position / offset parameters of terms
/// being added to the index. The information collected in this class is
/// also used to calculate the normalization factor for a field.
Expand Down Expand Up @@ -384,8 +382,8 @@ pub struct FieldInfos {
pub has_doc_values: bool,
pub has_point_values: bool,

pub by_number: BTreeMap<u32, FieldInfoRef>,
pub by_name: HashMap<String, FieldInfoRef>,
pub by_number: BTreeMap<u32, Arc<FieldInfo>>,
pub by_name: HashMap<String, Arc<FieldInfo>>,
}

impl Serialize for FieldInfos {
Expand Down Expand Up @@ -434,8 +432,8 @@ impl FieldInfos {
let mut has_doc_values = false;
let mut has_point_values = false;

let mut by_number: BTreeMap<u32, FieldInfoRef> = BTreeMap::new();
let mut by_name: HashMap<String, FieldInfoRef> = HashMap::new();
let mut by_number: BTreeMap<u32, Arc<FieldInfo>> = BTreeMap::new();
let mut by_name: HashMap<String, Arc<FieldInfo>> = HashMap::new();
let mut max_number = 0;

let mut infos = infos;
Expand Down
10 changes: 5 additions & 5 deletions src/core/codec/postings/blocktree/blocktree_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::ops::DerefMut;
use std::string::ToString;
use std::sync::Arc;

use core::codec::field_infos::{FieldInfo, FieldInfoRef};
use core::codec::field_infos::FieldInfo;
use core::codec::postings::blocktree::{BlockTermState, SegmentTermsIterFrame, MAX_LONGS_SIZE};
use core::codec::postings::{
FieldsProducer, Lucene50PostingIterator, Lucene50PostingsReader, Lucene50PostingsReaderRef,
Expand Down Expand Up @@ -389,7 +389,7 @@ type FSTRef = Arc<FST<ByteSequenceOutputFactory>>;
/// BlockTree's implementation of `Terms`
pub struct FieldReader {
num_terms: i64,
field_info: FieldInfoRef,
field_info: Arc<FieldInfo>,
sum_total_term_freq: i64,
sum_doc_freq: i64,
doc_count: i32,
Expand All @@ -411,7 +411,7 @@ impl FieldReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
parent: BlockTreeTermsReader,
field_info: FieldInfoRef,
field_info: Arc<FieldInfo>,
num_terms: i64,
root_code: Vec<u8>,
sum_total_term_freq: i64,
Expand Down Expand Up @@ -913,7 +913,7 @@ impl SegmentTermIterator {
field_reader: &FieldReader,
terms_in: IndexInputRef,
postings_reader: Lucene50PostingsReaderRef,
field_info: FieldInfoRef,
field_info: Arc<FieldInfo>,
) -> Self {
let iter = Box::new(SegmentTermIteratorInner::new(
field_reader,
Expand Down Expand Up @@ -1028,7 +1028,7 @@ impl SegmentTermIteratorInner {
field_reader: &FieldReader,
terms_in: IndexInputRef,
postings_reader: Lucene50PostingsReaderRef,
field_info: FieldInfoRef,
field_info: Arc<FieldInfo>,
) -> Self {
let mut arcs = vec![];
if let Some(ref index) = field_reader.index {
Expand Down
31 changes: 9 additions & 22 deletions src/core/codec/postings/per_field_postings_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ impl PostingsFormat for PerFieldPostingsFormat {
}

pub struct PerFieldFieldsReader {
_formats: HashMap<String, Arc<FieldsProducerEnum>>,
fields: BTreeMap<String, Arc<FieldsProducerEnum>>,
segment: String,
}
Expand All @@ -97,35 +96,27 @@ impl PerFieldFieldsReader {
state: &SegmentReadState<'_, D, DW, C>,
) -> Result<PerFieldFieldsReader> {
let mut fields = BTreeMap::new();
let mut _formats = HashMap::new();
let mut formats = HashMap::new();
for (name, info) in &state.field_infos.by_name {
if let IndexOptions::Null = info.index_options {
continue;
}
if let Some(format) = info
.attributes
.read()
.unwrap()
.get(PER_FIELD_POSTING_FORMAT_KEY)
{
if let Some(suffix) = info
.attributes
.read()
.unwrap()
.get(PER_FIELD_POSTING_SUFFIX_KEY)
{

let attrs = info.attributes.read().unwrap();
if let Some(format) = attrs.get(PER_FIELD_POSTING_FORMAT_KEY) {
if let Some(suffix) = attrs.get(PER_FIELD_POSTING_SUFFIX_KEY) {
let segment_suffix = get_suffix(&format, suffix);

if !_formats.contains_key(&segment_suffix) {
if !formats.contains_key(&segment_suffix) {
let postings_format = postings_format_for_name(format)?;
let state = SegmentReadState::with_suffix(state, &segment_suffix);
_formats.insert(
formats.insert(
segment_suffix.clone(),
Arc::new(postings_format.fields_producer(&state)?),
);
}

if let Some(field_producer) = _formats.get(&segment_suffix) {
if let Some(field_producer) = formats.get(&segment_suffix) {
fields.insert(name.clone(), field_producer.clone());
}
} else {
Expand All @@ -138,11 +129,7 @@ impl PerFieldFieldsReader {
}
}
let segment = state.segment_info.name.clone();
Ok(PerFieldFieldsReader {
_formats,
fields,
segment,
})
Ok(PerFieldFieldsReader { fields, segment })
}

fn terms_impl(&self, field: &str) -> Result<Option<FieldReaderRef>> {
Expand Down
18 changes: 13 additions & 5 deletions src/core/index/reader/segment_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,15 @@ pub struct SegmentReader<D: Directory, C: Codec> {

unsafe impl<D: Directory + Send + Sync + 'static, C: Codec> Sync for SegmentReader<D, C> {}

impl<D: Directory, C: Codec> Drop for SegmentReader<D, C> {
fn drop(&mut self) {
self.doc_values_producer_preload.write().unwrap().clear();
self.doc_values_local_preload.write().unwrap().clear();
self.doc_values_producer.clear();
self.doc_values_local.clear();
}
}

/// IndexReader implementation over a single segment.
/// Instances pointing to the same segment (but with different deletes, etc)
/// may share the same core data.
Expand All @@ -502,11 +511,7 @@ impl<D: Directory + 'static, C: Codec> SegmentReader<D, C> {
is_nrt: bool,
field_infos: Arc<FieldInfos>,
) -> SegmentReader<D, C> {
let doc_values_local = CachedThreadLocal::new();
doc_values_local.get_or(|| Box::new(RefCell::new(HashMap::new())));

let max_preload_num = 5 * num_cpus::get_physical();

let max_preload_num = num_cpus::get_physical();
let mut doc_values_producer_preload: Vec<Arc<dyn DocValuesProducer>> =
Vec::with_capacity(max_preload_num);
for _ in 0..max_preload_num {
Expand Down Expand Up @@ -583,6 +588,9 @@ impl<D: Directory + 'static, C: Codec> SegmentReader<D, C> {
doc_values_producer.get_or(|| Box::new(dv_producer));
}

let doc_values_local = CachedThreadLocal::new();
doc_values_local.get_or(|| Box::new(RefCell::new(HashMap::new())));

SegmentReader {
si,
live_docs,
Expand Down

0 comments on commit 60f21c3

Please sign in to comment.