Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work on WASM support, part 1/2: Remove the asynchronous code in indexer. #1312

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn index_documents(
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
segment_updater.schedule_add_segment(segment_entry)?;
Ok(())
}

Expand Down Expand Up @@ -368,7 +368,7 @@ impl IndexWriter {
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
block_on(self.segment_updater.schedule_add_segment(segment_entry))
self.segment_updater.schedule_add_segment(segment_entry)
}

/// Creates a new segment.
Expand Down Expand Up @@ -465,8 +465,10 @@ impl IndexWriter {
}

/// Detects and removes the files that are not used by the index anymore.
pub async fn garbage_collect_files(&self) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect().await
pub fn garbage_collect_files(
&self,
) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect()
}

/// Deletes all documents from the index
Expand Down Expand Up @@ -519,10 +521,10 @@ impl IndexWriter {
pub fn merge(
&mut self,
segment_ids: &[SegmentId],
) -> impl Future<Output = crate::Result<SegmentMeta>> {
) -> crate::Result<SegmentMeta> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
let segment_updater = self.segment_updater.clone();
async move { segment_updater.start_merge(merge_operation)?.await }
segment_updater.start_merge(merge_operation)
}

/// Closes the current document channel send.
Expand Down
18 changes: 8 additions & 10 deletions src/indexer/prepared_commit.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::executor::block_on;

use super::IndexWriter;
use crate::Opstamp;
use futures::executor::block_on;

/// A prepared commit
pub struct PreparedCommit<'a> {
Expand Down Expand Up @@ -37,20 +36,19 @@ impl<'a> PreparedCommit<'a> {
/// Proceeds to commit.
/// See `.commit_async()`.
pub fn commit(self) -> crate::Result<Opstamp> {
block_on(self.commit_async())
info!("committing {}", self.opstamp);
let _ = self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload);
Ok(self.opstamp)
}

/// Proceeds to commit.
///
/// Unfortunately, contrary to what `PrepareCommit` may suggests,
/// this operation is not at all really light.
/// At this point deletes have not been flushed yet.
pub async fn commit_async(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload)
.await?;
Ok(self.opstamp)
pub fn commit_async(self) -> crate::Result<Opstamp> {
unimplemented!()
}
}
158 changes: 49 additions & 109 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Deref for SegmentUpdater {
}
}

async fn garbage_collect_files(
fn garbage_collect_files(
segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
Expand Down Expand Up @@ -289,8 +289,6 @@ pub(crate) struct InnerSegmentUpdater {
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_index_meta: RwLock<Arc<IndexMeta>>,
pool: ThreadPool,
merge_thread_pool: ThreadPool,

index: Index,
segment_manager: SegmentManager,
Expand All @@ -308,29 +306,9 @@ impl SegmentUpdater {
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
.create()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment updater thread".to_string(),
)
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.name_prefix("merge_thread")
.pool_size(NUM_MERGE_THREADS)
.create()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment merging thread".to_string(),
)
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
Expand All @@ -349,39 +327,14 @@ impl SegmentUpdater {
*self.merge_policy.write().unwrap() = arc_merge_policy;
}

async fn schedule_task<
T: 'static + Send,
F: Future<Output = crate::Result<T>> + 'static + Send,
>(
pub fn schedule_add_segment(
&self,
task: F,
) -> crate::Result<T> {
if !self.is_alive() {
return Err(crate::TantivyError::SystemError(
"Segment updater killed".to_string(),
));
}
let (sender, receiver) = oneshot::channel();
self.pool.spawn_ok(async move {
let task_result = task.await;
let _ = sender.send(task_result);
});
let task_result = receiver.await;
task_result.unwrap_or_else(|_| {
let err_msg =
"A segment_updater future did not success. This should never happen.".to_string();
Err(crate::TantivyError::SystemError(err_msg))
})
}

pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> {
segment_entry: SegmentEntry,
) -> crate::Result<()> {
let segment_updater = self.clone();
self.schedule_task(async move {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
})
.await
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
Ok(())
}

/// Orders `SegmentManager` to remove all segments
Expand Down Expand Up @@ -448,9 +401,11 @@ impl SegmentUpdater {
Ok(())
}

pub async fn schedule_garbage_collect(&self) -> crate::Result<GarbageCollectionResult> {
pub fn schedule_garbage_collect(
&self,
) -> crate::Result<GarbageCollectionResult> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_task(garbage_collect_future).await
garbage_collect_future
}

/// List the files that are useful to the index.
Expand All @@ -468,21 +423,18 @@ impl SegmentUpdater {
files
}

pub(crate) async fn schedule_commit(
pub(crate) fn schedule_commit(
&self,
opstamp: Opstamp,
payload: Option<String>,
) -> crate::Result<()> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_task(async move {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
segment_updater.consider_merge_options().await;
Ok(())
})
.await
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone());
segment_updater.consider_merge_options();
Ok(())
}

fn store_meta(&self, index_meta: &IndexMeta) {
Expand Down Expand Up @@ -518,7 +470,7 @@ impl SegmentUpdater {
pub fn start_merge(
&self,
merge_operation: MergeOperation,
) -> crate::Result<impl Future<Output = crate::Result<SegmentMeta>>> {
) -> crate::Result<SegmentMeta> {
assert!(
!merge_operation.segment_ids().is_empty(),
"Segment_ids cannot be empty."
Expand All @@ -531,42 +483,31 @@ impl SegmentUpdater {

info!("Starting merge - {:?}", merge_operation.segment_ids());

let (merging_future_send, merging_future_recv) =
oneshot::channel::<crate::Result<SegmentMeta>>();

self.merge_thread_pool.spawn_ok(async move {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
) {
Ok(after_merge_segment_entry) => {
let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry)
.await;
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
e
);
// ... cancel merge
assert!(!cfg!(test), "Merge failed.");
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
) {
Ok(after_merge_segment_entry) => {
let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry);
segment_meta
}
Err(e) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
Err(crate::TantivyError::SystemError(
"Merge failed:".to_string() + &e.to_string(),
))
}
});

Ok(merging_future_recv.unwrap_or_else(|e| {
Err(crate::TantivyError::SystemError(
"Merge failed:".to_string() + &e.to_string(),
))
}))
}
}

pub(crate) fn get_mergeable_segments(&self) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
Expand All @@ -575,7 +516,7 @@ impl SegmentUpdater {
.get_mergeable_segments(&merge_segment_ids)
}

async fn consider_merge_options(&self) {
fn consider_merge_options(&self) {
let (committed_segments, uncommitted_segments) = self.get_mergeable_segments();

// Committed segments cannot be merged with uncommitted_segments.
Expand Down Expand Up @@ -610,14 +551,14 @@ impl SegmentUpdater {
}
}

async fn end_merge(
fn end_merge(
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> crate::Result<SegmentMeta> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
self.schedule_task(async move {
let end_merge_future = {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
Expand Down Expand Up @@ -655,14 +596,13 @@ impl SegmentUpdater {
.save_metas(previous_metas.opstamp, previous_metas.payload.clone())?;
}

segment_updater.consider_merge_options().await;
segment_updater.consider_merge_options();
} // we drop all possible handle to a now useless `SegmentMeta`.

let _ = garbage_collect_files(segment_updater).await;
let _ = garbage_collect_files(segment_updater);
Ok(())
})
.await?;
Ok(after_merge_segment_meta)
};
end_merge_future.map(|_| after_merge_segment_meta)
}

/// Wait for current merging threads.
Expand Down