diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 646ea60e06..f8a322d330 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -214,7 +214,7 @@ fn index_documents( meta.untrack_temp_docstore(); // update segment_updater inventory to remove tempstore let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt); - block_on(segment_updater.schedule_add_segment(segment_entry))?; + segment_updater.schedule_add_segment(segment_entry)?; Ok(()) } @@ -368,7 +368,7 @@ impl IndexWriter { pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> { let delete_cursor = self.delete_queue.cursor(); let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - block_on(self.segment_updater.schedule_add_segment(segment_entry)) + self.segment_updater.schedule_add_segment(segment_entry) } /// Creates a new segment. @@ -465,8 +465,10 @@ impl IndexWriter { } /// Detects and removes the files that are not used by the index anymore. - pub async fn garbage_collect_files(&self) -> crate::Result { - self.segment_updater.schedule_garbage_collect().await + pub fn garbage_collect_files( + &self, + ) -> crate::Result { + self.segment_updater.schedule_garbage_collect() } /// Deletes all documents from the index @@ -519,10 +521,10 @@ impl IndexWriter { pub fn merge( &mut self, segment_ids: &[SegmentId], - ) -> impl Future> { + ) -> crate::Result { let merge_operation = self.segment_updater.make_merge_operation(segment_ids); let segment_updater = self.segment_updater.clone(); - async move { segment_updater.start_merge(merge_operation)?.await } + segment_updater.start_merge(merge_operation) } /// Closes the current document channel send. diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 13d2cfa06a..f1059ad4d3 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,7 +1,6 @@ -use futures::executor::block_on; - use super::IndexWriter; use crate::Opstamp; +use futures::executor::block_on; /// A prepared commit pub struct PreparedCommit<'a> { @@ -37,7 +36,11 @@ impl<'a> PreparedCommit<'a> { /// Proceeds to commit. /// See `.commit_async()`. pub fn commit(self) -> crate::Result { - block_on(self.commit_async()) + info!("committing {}", self.opstamp); + let _ = self.index_writer + .segment_updater() + .schedule_commit(self.opstamp, self.payload); + Ok(self.opstamp) } /// Proceeds to commit. @@ -45,12 +48,7 @@ impl<'a> PreparedCommit<'a> { /// Unfortunately, contrary to what `PrepareCommit` may suggests, /// this operation is not at all really light. /// At this point deletes have not been flushed yet. - pub async fn commit_async(self) -> crate::Result { - info!("committing {}", self.opstamp); - self.index_writer - .segment_updater() - .schedule_commit(self.opstamp, self.payload) - .await?; - Ok(self.opstamp) + pub fn commit_async(self) -> crate::Result { + unimplemented!() } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index c1255eec63..c4361b86e3 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -105,7 +105,7 @@ impl Deref for SegmentUpdater { } } -async fn garbage_collect_files( +fn garbage_collect_files( segment_updater: SegmentUpdater, ) -> crate::Result { info!("Running garbage collection"); @@ -289,8 +289,6 @@ pub(crate) struct InnerSegmentUpdater { // This should be up to date as all update happen through // the unique active `SegmentUpdater`. active_index_meta: RwLock>, - pool: ThreadPool, - merge_thread_pool: ThreadPool, index: Index, segment_manager: SegmentManager, @@ -308,29 +306,9 @@ impl SegmentUpdater { ) -> crate::Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); - let pool = ThreadPoolBuilder::new() - .name_prefix("segment_updater") - .pool_size(1) - .create() - .map_err(|_| { - crate::TantivyError::SystemError( - "Failed to spawn segment updater thread".to_string(), - ) - })?; - let merge_thread_pool = ThreadPoolBuilder::new() - .name_prefix("merge_thread") - .pool_size(NUM_MERGE_THREADS) - .create() - .map_err(|_| { - crate::TantivyError::SystemError( - "Failed to spawn segment merging thread".to_string(), - ) - })?; let index_meta = index.load_metas()?; Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { active_index_meta: RwLock::new(Arc::new(index_meta)), - pool, - merge_thread_pool, index, segment_manager, merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), @@ -349,39 +327,14 @@ impl SegmentUpdater { *self.merge_policy.write().unwrap() = arc_merge_policy; } - async fn schedule_task< - T: 'static + Send, - F: Future> + 'static + Send, - >( + pub fn schedule_add_segment( &self, - task: F, - ) -> crate::Result { - if !self.is_alive() { - return Err(crate::TantivyError::SystemError( - "Segment updater killed".to_string(), - )); - } - let (sender, receiver) = oneshot::channel(); - self.pool.spawn_ok(async move { - let task_result = task.await; - let _ = sender.send(task_result); - }); - let task_result = receiver.await; - task_result.unwrap_or_else(|_| { - let err_msg = - "A segment_updater future did not success. This should never happen.".to_string(); - Err(crate::TantivyError::SystemError(err_msg)) - }) - } - - pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> { + segment_entry: SegmentEntry, + ) -> crate::Result<()> { let segment_updater = self.clone(); - self.schedule_task(async move { - segment_updater.segment_manager.add_segment(segment_entry); - segment_updater.consider_merge_options().await; - Ok(()) - }) - .await + segment_updater.segment_manager.add_segment(segment_entry); + segment_updater.consider_merge_options(); + Ok(()) } /// Orders `SegmentManager` to remove all segments @@ -448,9 +401,11 @@ impl SegmentUpdater { Ok(()) } - pub async fn schedule_garbage_collect(&self) -> crate::Result { + pub fn schedule_garbage_collect( + &self, + ) -> crate::Result { let garbage_collect_future = garbage_collect_files(self.clone()); - self.schedule_task(garbage_collect_future).await + garbage_collect_future } /// List the files that are useful to the index. @@ -468,21 +423,18 @@ impl SegmentUpdater { files } - pub(crate) async fn schedule_commit( + pub(crate) fn schedule_commit( &self, opstamp: Opstamp, payload: Option, ) -> crate::Result<()> { let segment_updater: SegmentUpdater = self.clone(); - self.schedule_task(async move { - let segment_entries = segment_updater.purge_deletes(opstamp)?; - segment_updater.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload)?; - let _ = garbage_collect_files(segment_updater.clone()).await; - segment_updater.consider_merge_options().await; - Ok(()) - }) - .await + let segment_entries = segment_updater.purge_deletes(opstamp)?; + segment_updater.segment_manager.commit(segment_entries); + segment_updater.save_metas(opstamp, payload)?; + let _ = garbage_collect_files(segment_updater.clone()); + segment_updater.consider_merge_options(); + Ok(()) } fn store_meta(&self, index_meta: &IndexMeta) { @@ -518,7 +470,7 @@ impl SegmentUpdater { pub fn start_merge( &self, merge_operation: MergeOperation, - ) -> crate::Result>> { + ) -> crate::Result { assert!( !merge_operation.segment_ids().is_empty(), "Segment_ids cannot be empty." @@ -531,42 +483,31 @@ impl SegmentUpdater { info!("Starting merge - {:?}", merge_operation.segment_ids()); - let (merging_future_send, merging_future_recv) = - oneshot::channel::>(); - - self.merge_thread_pool.spawn_ok(async move { - // The fact that `merge_operation` is moved here is important. - // Its lifetime is used to track how many merging thread are currently running, - // as well as which segment is currently in merge and therefore should not be - // candidate for another merge. - match merge( - &segment_updater.index, - segment_entries, - merge_operation.target_opstamp(), - ) { - Ok(after_merge_segment_entry) => { - let segment_meta = segment_updater - .end_merge(merge_operation, after_merge_segment_entry) - .await; - let _send_result = merging_future_send.send(segment_meta); - } - Err(e) => { - warn!( - "Merge of {:?} was cancelled: {:?}", - merge_operation.segment_ids().to_vec(), - e - ); - // ... cancel merge - assert!(!cfg!(test), "Merge failed."); + match merge( + &segment_updater.index, + segment_entries, + merge_operation.target_opstamp(), + ) { + Ok(after_merge_segment_entry) => { + let segment_meta = segment_updater + .end_merge(merge_operation, after_merge_segment_entry); + segment_meta + } + Err(e) => { + warn!( + "Merge of {:?} was cancelled: {:?}", + merge_operation.segment_ids().to_vec(), + e + ); + // ... cancel merge + if cfg!(test) { + panic!("Merge failed."); } + Err(crate::TantivyError::SystemError( + "Merge failed:".to_string() + &e.to_string(), + )) } - }); - - Ok(merging_future_recv.unwrap_or_else(|e| { - Err(crate::TantivyError::SystemError( - "Merge failed:".to_string() + &e.to_string(), - )) - })) + } } pub(crate) fn get_mergeable_segments(&self) -> (Vec, Vec) { @@ -575,7 +516,7 @@ impl SegmentUpdater { .get_mergeable_segments(&merge_segment_ids) } - async fn consider_merge_options(&self) { + fn consider_merge_options(&self) { let (committed_segments, uncommitted_segments) = self.get_mergeable_segments(); // Committed segments cannot be merged with uncommitted_segments. @@ -610,14 +551,14 @@ impl SegmentUpdater { } } - async fn end_merge( + fn end_merge( &self, merge_operation: MergeOperation, mut after_merge_segment_entry: SegmentEntry, ) -> crate::Result { let segment_updater = self.clone(); let after_merge_segment_meta = after_merge_segment_entry.meta().clone(); - self.schedule_task(async move { + let end_merge_future = { info!("End merge {:?}", after_merge_segment_entry.meta()); { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); @@ -655,14 +596,13 @@ impl SegmentUpdater { .save_metas(previous_metas.opstamp, previous_metas.payload.clone())?; } - segment_updater.consider_merge_options().await; + segment_updater.consider_merge_options(); } // we drop all possible handle to a now useless `SegmentMeta`. - let _ = garbage_collect_files(segment_updater).await; + let _ = garbage_collect_files(segment_updater); Ok(()) - }) - .await?; - Ok(after_merge_segment_meta) + }; + end_merge_future.map(|_| after_merge_segment_meta) } /// Wait for current merging threads.