diff --git a/src/core/index/merge/merge_scheduler.rs b/src/core/index/merge/merge_scheduler.rs index 9744eb8..c7e520a 100644 --- a/src/core/index/merge/merge_scheduler.rs +++ b/src/core/index/merge/merge_scheduler.rs @@ -17,7 +17,6 @@ use core::index::merge::{MergePolicy, MergerTrigger, OneMerge, OneMergeScheduleI use core::index::writer::IndexWriter; use core::store::directory::Directory; use core::store::RateLimiter; -use std::borrow::BorrowMut; use std::cell::UnsafeCell; use error::{Error, ErrorKind, Result}; @@ -158,22 +157,21 @@ impl ConcurrentMergeScheduler { } Self { inner: Arc::new(Mutex::new(ConcurrentMergeSchedulerInner::new( - max_thread_count), - )), + max_thread_count, + ))), } } } struct ConcurrentMergeSchedulerInner { - lock: UnsafeCell>, - cond: UnsafeCell, - merge_tasks: UnsafeCell>, - max_merge_count: UnsafeCell, - max_thread_count: UnsafeCell, - merge_thread_count: UnsafeCell, - target_mb_per_sec: UnsafeCell, - do_auto_io_throttle: UnsafeCell, - force_merge_mb_per_sec: UnsafeCell, + cond: Condvar, + merge_tasks: Vec, + max_merge_count: usize, + max_thread_count: usize, + merge_thread_count: usize, + target_mb_per_sec: f64, + do_auto_io_throttle: bool, + force_merge_mb_per_sec: f64, } // Floor for IO write rate limit (we will never go any lower than this) @@ -195,33 +193,41 @@ pub const MAX_MERGING_COUNT: usize = 5; impl ConcurrentMergeSchedulerInner { fn new(max_thread_count: usize) -> Self { ConcurrentMergeSchedulerInner { - lock: UnsafeCell::new(Mutex::new(())), - cond: UnsafeCell::new(Condvar::new()), - merge_tasks: UnsafeCell::new(vec![]), - max_merge_count: UnsafeCell::new(max_thread_count.max(MAX_MERGING_COUNT)), - max_thread_count: UnsafeCell::new(max_thread_count), - merge_thread_count: UnsafeCell::new(0), - target_mb_per_sec: UnsafeCell::new(START_MB_PER_SEC), - do_auto_io_throttle: UnsafeCell::new(true), - force_merge_mb_per_sec: UnsafeCell::new(f64::INFINITY), + cond: Condvar::new(), + merge_tasks: vec![], + max_merge_count: max_thread_count.max(MAX_MERGING_COUNT), + max_thread_count, + merge_thread_count: 0, + target_mb_per_sec: START_MB_PER_SEC, + do_auto_io_throttle: true, + force_merge_mb_per_sec: f64::INFINITY, } } + unsafe fn get_self( + ptr: &UnsafeCell, + ) -> &mut ConcurrentMergeSchedulerInner { + unsafe { &mut *ptr.get() } + } + #[allow(clippy::mut_from_ref)] unsafe fn scheduler_mut( &self, - _guard: UnsafeCell<&MutexGuard<()>>, + _guard: &MutexGuard<(ConcurrentMergeSchedulerInner)>, ) -> &mut ConcurrentMergeSchedulerInner { - let scheduler = - self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner; - unsafe { &mut *scheduler } + let t = self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner + as *const UnsafeCell; + unsafe { + let scheduler = ConcurrentMergeSchedulerInner::get_self(t.as_ref().unwrap()); + &mut *scheduler + } } fn maybe_stall<'a, D, C, MP>( &self, writer: &IndexWriter, - guard: MutexGuard<'a, ()>, - ) -> (bool, MutexGuard<'a, ()>) + guard: MutexGuard<'a, ConcurrentMergeSchedulerInner>, + ) -> (bool, MutexGuard<'a, ConcurrentMergeSchedulerInner>) where D: Directory + Send + Sync + 'static, C: Codec, @@ -229,93 +235,81 @@ impl ConcurrentMergeSchedulerInner { { let thread_id = thread::current().id(); let mut guard = guard; - unsafe { - while writer.has_pending_merges() && self.merge_thread_count() >= *self.max_merge_count.get() { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - if self.merge_tasks.get_mut().iter().any(|t| t.thread_id == thread_id) { - // Never stall a merge thread since this blocks the thread from - // finishing and calling updateMergeThreads, and blocking it - // accomplishes nothing anyway (it's not really a segment producer): - return (false, guard); - } - - // Defensively wait for only .25 seconds in case we are missing a .notify/All - // somewhere: - let (g, _) = *self - .cond - .get() - .wait_timeout(guard, Duration::from_millis(25)) - .unwrap(); - guard = g; + while writer.has_pending_merges() && self.merge_thread_count() >= self.max_merge_count { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + if self.merge_tasks.iter().any(|t| t.thread_id == thread_id) { + // Never stall a merge thread since this blocks the thread from + // finishing and calling updateMergeThreads, and blocking it + // accomplishes nothing anyway (it's not really a segment producer): + return (false, guard); } + + // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere: + let (g, _) = self + .cond + .wait_timeout(guard, Duration::from_millis(25)) + .unwrap(); + guard = g; } (true, guard) } fn update_merge_threads(&mut self) { - unsafe { - let mut active_tasks: Vec = self.merge_tasks.get_mut().iter().collect(); - active_tasks.sort(); - - let tasks_count = active_tasks.len(); - - let mut big_merge_count = 0; - for i in 0..tasks_count { - if active_tasks[tasks_count - 1 - i] - .merge - .estimated_merge_bytes - .read() as f64 - > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 - { - big_merge_count = tasks_count - i; - break; - } + let mut active_tasks: Vec<_> = self.merge_tasks.iter().collect(); + active_tasks.sort(); + + let tasks_count = active_tasks.len(); + + let mut big_merge_count = 0; + for i in 0..tasks_count { + if active_tasks[tasks_count - 1 - i] + .merge + .estimated_merge_bytes + .read() as f64 + > MIN_BIG_MERGE_MB * 1024.0 * 1024.0 + { + big_merge_count = tasks_count - i; + break; } + } - for (idx, task) in active_tasks.iter().enumerate() { - // pause the thread if max_thread_count is smaller than the number of merge threads. - let do_pause = idx + self.max_thread_count < big_merge_count; - unsafe - - let new_mb_per_sec = if do_pause { - 0.0 - } else if task.merge.max_num_segments.get().is_some() { - self.force_merge_mb_per_sec - } else if !self.do_auto_io_throttle.get() - || ((task.merge.estimated_merge_bytes.read() as f64) - < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) - { - f64::INFINITY - } else { - self.target_mb_per_sec.get() - }; + for (idx, task) in active_tasks.iter().enumerate() { + // pause the thread if max_thread_count is smaller than the number of merge threads. + let do_pause = idx + self.max_thread_count < big_merge_count; + + let new_mb_per_sec = if do_pause { + 0.0 + } else if task.merge.max_num_segments.get().is_some() { + self.force_merge_mb_per_sec + } else if !self.do_auto_io_throttle + || ((task.merge.estimated_merge_bytes.read() as f64) + < MIN_BIG_MERGE_MB * 1024.0 * 1024.0) + { + f64::INFINITY + } else { + self.target_mb_per_sec + }; - task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); - } + task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec); } } fn merge_thread_count(&self) -> usize { let current_thread = thread::current().id(); - unsafe { - self.merge_tasks - .get_mut() - .iter() - .filter(|t| { - t.thread_id != current_thread - && t.thread_alive() - && !t.merge.rate_limiter.aborted() - }) - .count() - } + self.merge_tasks + .iter() + .filter(|t| { + t.thread_id != current_thread && t.thread_alive() && !t.merge.rate_limiter.aborted() + }) + .count() } fn update_io_throttle( @@ -395,7 +389,6 @@ impl ConcurrentMergeSchedulerInner { } } } - false } } @@ -412,62 +405,63 @@ impl MergeScheduler for ConcurrentMergeScheduler { C: Codec, MP: MergePolicy, { - let mut guard = self.inner.lock.lock().unwrap(); - let t = - guard.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; - let scheduler = unsafe { self.inner.scheduler_mut((&guard).into()) }; - - if trigger == MergerTrigger::Closing { - // Disable throttling on close: - scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC; - scheduler.update_merge_threads(); - } - - // First, quickly run through the newly proposed merges - // and add any orthogonal merges (ie a merge not - // involving segments already pending to be merged) to - // the queue. If we are way behind on merging, many of - // these newly proposed merges will likely already be - // registered. + unsafe { + let mut guard = self.inner.lock().unwrap(); + let lock = self.inner.lock().unwrap(); + let scheduler = lock.scheduler_mut(&guard); - loop { - let (valid, g) = scheduler.maybe_stall(writer, guard); - guard = g; - if !valid { - break; + if trigger == MergerTrigger::Closing { + // Disable throttling on close: + scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC; + scheduler.update_merge_threads(); } - if let Some(merge) = writer.next_merge() { - scheduler.update_io_throttle(&merge); - - let sentinel = Arc::new(ThreadSentinel); - let live_sentinel = Arc::downgrade(&sentinel); - let merge_thread = MergeThread { - index_writer: writer.clone(), - merge_scheduler: self.clone(), - _live_sentinel: sentinel, - }; - let merge_info = merge.schedule_info(); - let handler = thread::Builder::new() - .name(format!( - "Rucene Merge Thread #{}", - scheduler.merge_thread_count - )) - .spawn(move || { - merge_thread.merge(merge); - }) - .expect("failed to spawn thread"); - scheduler.merge_thread_count += 1; - - let merge_task = MergeTaskInfo { - merge: merge_info, - thread_id: handler.thread().id(), - live_sentinel, - }; - scheduler.merge_tasks.push(merge_task); - scheduler.update_merge_threads(); - } else { - return Ok(()); + // First, quickly run through the newly proposed merges + // and add any orthogonal merges (ie a merge not + // involving segments already pending to be merged) to + // the queue. If we are way behind on merging, many of + // these newly proposed merges will likely already be + // registered. + + loop { + let (valid, g) = scheduler.maybe_stall(writer, guard); + guard = g; + if !valid { + break; + } + + if let Some(merge) = writer.next_merge() { + scheduler.update_io_throttle(&merge); + + let sentinel = Arc::new(ThreadSentinel); + let live_sentinel = Arc::downgrade(&sentinel); + let merge_thread = MergeThread { + index_writer: writer.clone(), + merge_scheduler: self.clone(), + _live_sentinel: sentinel, + }; + let merge_info = merge.schedule_info(); + let handler = thread::Builder::new() + .name(format!( + "Rucene Merge Thread #{}", + scheduler.merge_thread_count + )) + .spawn(move || { + merge_thread.merge(merge); + }) + .expect("failed to spawn thread"); + scheduler.merge_thread_count += 1; + + let merge_task = MergeTaskInfo { + merge: merge_info, + thread_id: handler.thread().id(), + live_sentinel, + }; + scheduler.merge_tasks.push(merge_task); + scheduler.update_merge_threads(); + } else { + return Ok(()); + } } } Ok(()) @@ -481,7 +475,7 @@ impl MergeScheduler for ConcurrentMergeScheduler { } fn merging_thread_count(&self) -> Option { - Some(self.inner.merge_thread_count()) + Some(self.inner.lock().unwrap().merge_thread_count()) } } @@ -510,9 +504,8 @@ impl MergeThrea } Ok(()) => {} } - let mut l = self.merge_scheduler.inner.lock.lock().unwrap(); - let t = l.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>; - let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut((&l).into()) }; + let l = self.merge_scheduler.inner.lock().unwrap(); + let scheduler_mut = unsafe { l.scheduler_mut(&l) }; scheduler_mut .merge_tasks .retain(|t| t.merge.id != one_merge.id);