Skip to content

Commit

Permalink
Fixing merge scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi committed Feb 7, 2024
1 parent 8edf9aa commit 1838d0c
Showing 1 changed file with 151 additions and 158 deletions.
309 changes: 151 additions & 158 deletions src/core/index/merge/merge_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use core::index::merge::{MergePolicy, MergerTrigger, OneMerge, OneMergeScheduleI
use core::index::writer::IndexWriter;
use core::store::directory::Directory;
use core::store::RateLimiter;
use std::borrow::BorrowMut;
use std::cell::UnsafeCell;

use error::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -158,22 +157,21 @@ impl ConcurrentMergeScheduler {
}
Self {
inner: Arc::new(Mutex::new(ConcurrentMergeSchedulerInner::new(
max_thread_count),
)),
max_thread_count,
))),
}
}
}

struct ConcurrentMergeSchedulerInner {
lock: UnsafeCell<Mutex<()>>,
cond: UnsafeCell<Condvar>,
merge_tasks: UnsafeCell<Vec<MergeTaskInfo>>,
max_merge_count: UnsafeCell<usize>,
max_thread_count: UnsafeCell<usize>,
merge_thread_count: UnsafeCell<usize>,
target_mb_per_sec: UnsafeCell<f64>,
do_auto_io_throttle: UnsafeCell<bool>,
force_merge_mb_per_sec: UnsafeCell<f64>,
cond: Condvar,
merge_tasks: Vec<MergeTaskInfo>,
max_merge_count: usize,
max_thread_count: usize,
merge_thread_count: usize,
target_mb_per_sec: f64,
do_auto_io_throttle: bool,
force_merge_mb_per_sec: f64,
}

// Floor for IO write rate limit (we will never go any lower than this)
Expand All @@ -195,127 +193,123 @@ pub const MAX_MERGING_COUNT: usize = 5;
impl ConcurrentMergeSchedulerInner {
fn new(max_thread_count: usize) -> Self {
ConcurrentMergeSchedulerInner {
lock: UnsafeCell::new(Mutex::new(())),
cond: UnsafeCell::new(Condvar::new()),
merge_tasks: UnsafeCell::new(vec![]),
max_merge_count: UnsafeCell::new(max_thread_count.max(MAX_MERGING_COUNT)),
max_thread_count: UnsafeCell::new(max_thread_count),
merge_thread_count: UnsafeCell::new(0),
target_mb_per_sec: UnsafeCell::new(START_MB_PER_SEC),
do_auto_io_throttle: UnsafeCell::new(true),
force_merge_mb_per_sec: UnsafeCell::new(f64::INFINITY),
cond: Condvar::new(),
merge_tasks: vec![],
max_merge_count: max_thread_count.max(MAX_MERGING_COUNT),
max_thread_count,
merge_thread_count: 0,
target_mb_per_sec: START_MB_PER_SEC,
do_auto_io_throttle: true,
force_merge_mb_per_sec: f64::INFINITY,
}
}

unsafe fn get_self<ConcurrentMergeSchedulerInner>(
ptr: &UnsafeCell<ConcurrentMergeSchedulerInner>,
) -> &mut ConcurrentMergeSchedulerInner {
unsafe { &mut *ptr.get() }
}

#[allow(clippy::mut_from_ref)]
unsafe fn scheduler_mut(
&self,
_guard: UnsafeCell<&MutexGuard<()>>,
_guard: &MutexGuard<(ConcurrentMergeSchedulerInner)>,
) -> &mut ConcurrentMergeSchedulerInner {
let scheduler =
self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner;
unsafe { &mut *scheduler }
let t = self as *const ConcurrentMergeSchedulerInner as *mut ConcurrentMergeSchedulerInner
as *const UnsafeCell<ConcurrentMergeSchedulerInner>;
unsafe {
let scheduler = ConcurrentMergeSchedulerInner::get_self(t.as_ref().unwrap());
&mut *scheduler
}
}

fn maybe_stall<'a, D, C, MP>(
&self,
writer: &IndexWriter<D, C, ConcurrentMergeScheduler, MP>,
guard: MutexGuard<'a, ()>,
) -> (bool, MutexGuard<'a, ()>)
guard: MutexGuard<'a, ConcurrentMergeSchedulerInner>,
) -> (bool, MutexGuard<'a, ConcurrentMergeSchedulerInner>)
where
D: Directory + Send + Sync + 'static,
C: Codec,
MP: MergePolicy,
{
let thread_id = thread::current().id();
let mut guard = guard;
unsafe {
while writer.has_pending_merges() && self.merge_thread_count() >= *self.max_merge_count.get() {
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
if self.merge_tasks.get_mut().iter().any(|t| t.thread_id == thread_id) {
// Never stall a merge thread since this blocks the thread from
// finishing and calling updateMergeThreads, and blocking it
// accomplishes nothing anyway (it's not really a segment producer):
return (false, guard);
}

// Defensively wait for only .25 seconds in case we are missing a .notify/All
// somewhere:
let (g, _) = *self
.cond
.get()
.wait_timeout(guard, Duration::from_millis(25))
.unwrap();
guard = g;
while writer.has_pending_merges() && self.merge_thread_count() >= self.max_merge_count {
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
// Note that only maxThreadCount of
// those created merge threads will actually be
// running; the rest will be paused (see
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
if self.merge_tasks.iter().any(|t| t.thread_id == thread_id) {
// Never stall a merge thread since this blocks the thread from
// finishing and calling updateMergeThreads, and blocking it
// accomplishes nothing anyway (it's not really a segment producer):
return (false, guard);
}

// Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere:
let (g, _) = self
.cond
.wait_timeout(guard, Duration::from_millis(25))
.unwrap();
guard = g;
}
(true, guard)
}

fn update_merge_threads(&mut self) {
unsafe {
let mut active_tasks: Vec<MergeTaskInfo> = self.merge_tasks.get_mut().iter().collect();
active_tasks.sort();

let tasks_count = active_tasks.len();

let mut big_merge_count = 0;
for i in 0..tasks_count {
if active_tasks[tasks_count - 1 - i]
.merge
.estimated_merge_bytes
.read() as f64
> MIN_BIG_MERGE_MB * 1024.0 * 1024.0
{
big_merge_count = tasks_count - i;
break;
}
let mut active_tasks: Vec<_> = self.merge_tasks.iter().collect();
active_tasks.sort();

let tasks_count = active_tasks.len();

let mut big_merge_count = 0;
for i in 0..tasks_count {
if active_tasks[tasks_count - 1 - i]
.merge
.estimated_merge_bytes
.read() as f64
> MIN_BIG_MERGE_MB * 1024.0 * 1024.0
{
big_merge_count = tasks_count - i;
break;
}
}

for (idx, task) in active_tasks.iter().enumerate() {
// pause the thread if max_thread_count is smaller than the number of merge threads.
let do_pause = idx + self.max_thread_count < big_merge_count;
unsafe

let new_mb_per_sec = if do_pause {
0.0
} else if task.merge.max_num_segments.get().is_some() {
self.force_merge_mb_per_sec
} else if !self.do_auto_io_throttle.get()
|| ((task.merge.estimated_merge_bytes.read() as f64)
< MIN_BIG_MERGE_MB * 1024.0 * 1024.0)
{
f64::INFINITY
} else {
self.target_mb_per_sec.get()
};
for (idx, task) in active_tasks.iter().enumerate() {
// pause the thread if max_thread_count is smaller than the number of merge threads.
let do_pause = idx + self.max_thread_count < big_merge_count;

let new_mb_per_sec = if do_pause {
0.0
} else if task.merge.max_num_segments.get().is_some() {
self.force_merge_mb_per_sec
} else if !self.do_auto_io_throttle
|| ((task.merge.estimated_merge_bytes.read() as f64)
< MIN_BIG_MERGE_MB * 1024.0 * 1024.0)
{
f64::INFINITY
} else {
self.target_mb_per_sec
};

task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec);
}
task.merge.rate_limiter.set_mb_per_sec(new_mb_per_sec);
}
}

fn merge_thread_count(&self) -> usize {
let current_thread = thread::current().id();
unsafe {
self.merge_tasks
.get_mut()
.iter()
.filter(|t| {
t.thread_id != current_thread
&& t.thread_alive()
&& !t.merge.rate_limiter.aborted()
})
.count()
}
self.merge_tasks
.iter()
.filter(|t| {
t.thread_id != current_thread && t.thread_alive() && !t.merge.rate_limiter.aborted()
})
.count()
}

fn update_io_throttle<D: Directory + Send + Sync + 'static, C: Codec>(
Expand Down Expand Up @@ -395,7 +389,6 @@ impl ConcurrentMergeSchedulerInner {
}
}
}

false
}
}
Expand All @@ -412,62 +405,63 @@ impl MergeScheduler for ConcurrentMergeScheduler {
C: Codec,
MP: MergePolicy,
{
let mut guard = self.inner.lock.lock().unwrap();
let t =
guard.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>;
let scheduler = unsafe { self.inner.scheduler_mut((&guard).into()) };

if trigger == MergerTrigger::Closing {
// Disable throttling on close:
scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC;
scheduler.update_merge_threads();
}

// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
unsafe {
let mut guard = self.inner.lock().unwrap();
let lock = self.inner.lock().unwrap();
let scheduler = lock.scheduler_mut(&guard);

loop {
let (valid, g) = scheduler.maybe_stall(writer, guard);
guard = g;
if !valid {
break;
if trigger == MergerTrigger::Closing {
// Disable throttling on close:
scheduler.target_mb_per_sec = MAX_MERGE_MB_PER_SEC;
scheduler.update_merge_threads();
}

if let Some(merge) = writer.next_merge() {
scheduler.update_io_throttle(&merge);

let sentinel = Arc::new(ThreadSentinel);
let live_sentinel = Arc::downgrade(&sentinel);
let merge_thread = MergeThread {
index_writer: writer.clone(),
merge_scheduler: self.clone(),
_live_sentinel: sentinel,
};
let merge_info = merge.schedule_info();
let handler = thread::Builder::new()
.name(format!(
"Rucene Merge Thread #{}",
scheduler.merge_thread_count
))
.spawn(move || {
merge_thread.merge(merge);
})
.expect("failed to spawn thread");
scheduler.merge_thread_count += 1;

let merge_task = MergeTaskInfo {
merge: merge_info,
thread_id: handler.thread().id(),
live_sentinel,
};
scheduler.merge_tasks.push(merge_task);
scheduler.update_merge_threads();
} else {
return Ok(());
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.

loop {
let (valid, g) = scheduler.maybe_stall(writer, guard);
guard = g;
if !valid {
break;
}

if let Some(merge) = writer.next_merge() {
scheduler.update_io_throttle(&merge);

let sentinel = Arc::new(ThreadSentinel);
let live_sentinel = Arc::downgrade(&sentinel);
let merge_thread = MergeThread {
index_writer: writer.clone(),
merge_scheduler: self.clone(),
_live_sentinel: sentinel,
};
let merge_info = merge.schedule_info();
let handler = thread::Builder::new()
.name(format!(
"Rucene Merge Thread #{}",
scheduler.merge_thread_count
))
.spawn(move || {
merge_thread.merge(merge);
})
.expect("failed to spawn thread");
scheduler.merge_thread_count += 1;

let merge_task = MergeTaskInfo {
merge: merge_info,
thread_id: handler.thread().id(),
live_sentinel,
};
scheduler.merge_tasks.push(merge_task);
scheduler.update_merge_threads();
} else {
return Ok(());
}
}
}
Ok(())
Expand All @@ -481,7 +475,7 @@ impl MergeScheduler for ConcurrentMergeScheduler {
}

fn merging_thread_count(&self) -> Option<usize> {
Some(self.inner.merge_thread_count())
Some(self.inner.lock().unwrap().merge_thread_count())
}
}

Expand Down Expand Up @@ -510,9 +504,8 @@ impl<D: Directory + Send + Sync + 'static, C: Codec, MP: MergePolicy> MergeThrea
}
Ok(()) => {}
}
let mut l = self.merge_scheduler.inner.lock.lock().unwrap();
let t = l.borrow_mut() as *mut MutexGuard<'_, ()> as *const UnsafeCell<&MutexGuard<'_, ()>>;
let scheduler_mut = unsafe { self.merge_scheduler.inner.scheduler_mut((&l).into()) };
let l = self.merge_scheduler.inner.lock().unwrap();
let scheduler_mut = unsafe { l.scheduler_mut(&l) };
scheduler_mut
.merge_tasks
.retain(|t| t.merge.id != one_merge.id);
Expand Down

0 comments on commit 1838d0c

Please sign in to comment.