Skip to content

Commit

Permalink
Removing needless uses of async in the API.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Mar 18, 2022
1 parent f4cf1f5 commit 10c6386
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ fs2={ version = "0.4.3", optional = true }
levenshtein_automata = "0.2"
uuid = { version = "0.8.2", features = ["v4", "serde"] }
crossbeam = "0.8.1"
futures = { version = "0.3.15", features = ["thread-pool"] }
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
Expand Down Expand Up @@ -72,6 +71,7 @@ criterion = "0.3.5"
test-log = "0.2.8"
env_logger = "0.9.0"
pprof = {version= "0.7", features=["flamegraph", "criterion"]}
futures = "0.3.15"

[dev-dependencies.fail]
version = "0.5"
Expand Down
20 changes: 0 additions & 20 deletions src/future_result.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,3 @@
// Copyright (C) 2021 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//

use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ impl IndexWriter {
}

/// Detects and removes the files that are not used by the index anymore.
pub async fn garbage_collect_files(&self) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect().await
pub fn garbage_collect_files(&self) -> FutureResult<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect()
}

/// Deletes all documents from the index
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/prepared_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ impl<'a> PreparedCommit<'a> {
/// Proceeds to commit.
/// See `.commit_async()`.
pub fn commit(self) -> crate::Result<Opstamp> {
self.commit_async().wait()
self.commit_future().wait()
}

/// Proceeds to commit.
///
/// Unfortunately, contrary to what `PrepareCommit` may suggests,
/// this operation is not at all really light.
/// At this point deletes have not been flushed yet.
pub fn commit_async(self) -> FutureResult<Opstamp> {
pub fn commit_future(self) -> FutureResult<Opstamp> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
Expand Down
56 changes: 27 additions & 29 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::future::Future;
use std::io;
use std::io::Write;
use std::ops::Deref;
Expand All @@ -9,7 +8,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};

use fail::fail_point;
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use rayon::{ThreadPool, ThreadPoolBuilder};

use super::segment_manager::SegmentManager;
use crate::core::{
Expand Down Expand Up @@ -104,7 +103,7 @@ impl Deref for SegmentUpdater {
}
}

async fn garbage_collect_files(
fn garbage_collect_files(
segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
Expand Down Expand Up @@ -308,18 +307,18 @@ impl SegmentUpdater {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
.create()
.thread_name(|_| "segment_updater".to_string())
.num_threads(1)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment updater thread".to_string(),
)
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.name_prefix("merge_thread")
.pool_size(NUM_MERGE_THREADS)
.create()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(NUM_MERGE_THREADS)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
"Failed to spawn segment merging thread".to_string(),
Expand Down Expand Up @@ -348,7 +347,7 @@ impl SegmentUpdater {
*self.merge_policy.write().unwrap() = arc_merge_policy;
}

fn schedule_task<T: 'static + Send, F: Future<Output = crate::Result<T>> + 'static + Send>(
fn schedule_task<T: 'static + Send, F: FnOnce() -> crate::Result<T> + 'static + Send>(
&self,
task: F,
) -> FutureResult<T> {
Expand All @@ -357,18 +356,18 @@ impl SegmentUpdater {
}
let (scheduled_result, sender) =
FutureResult::create("A segment_updater future did not success. This never happen.");
self.pool.spawn_ok(async move {
let task_result = task.await;
self.pool.spawn(|| {
let task_result = task();
let _ = sender.send(task_result);
});
scheduled_result
}

pub fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> FutureResult<()> {
let segment_updater = self.clone();
self.schedule_task(async move {
self.schedule_task(move || {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
segment_updater.consider_merge_options();
Ok(())
})
}
Expand Down Expand Up @@ -437,9 +436,9 @@ impl SegmentUpdater {
Ok(())
}

pub async fn schedule_garbage_collect(&self) -> crate::Result<GarbageCollectionResult> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_task(garbage_collect_future).wait()
pub fn schedule_garbage_collect(&self) -> FutureResult<GarbageCollectionResult> {
let self_clone = self.clone();
self.schedule_task(move || garbage_collect_files(self_clone))
}

/// List the files that are useful to the index.
Expand All @@ -463,12 +462,12 @@ impl SegmentUpdater {
payload: Option<String>,
) -> FutureResult<Opstamp> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_task(async move {
self.schedule_task(move || {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
segment_updater.consider_merge_options().await;
let _ = garbage_collect_files(segment_updater.clone());
segment_updater.consider_merge_options();
Ok(opstamp)
})
}
Expand Down Expand Up @@ -529,7 +528,7 @@ impl SegmentUpdater {
let (scheduled_result, merging_future_send) =
FutureResult::create("Merge operation failed.");

self.merge_thread_pool.spawn_ok(async move {
self.merge_thread_pool.spawn(move || {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
Expand All @@ -540,9 +539,8 @@ impl SegmentUpdater {
merge_operation.target_opstamp(),
) {
Ok(after_merge_segment_entry) => {
let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry)
.await;
let segment_meta =
segment_updater.end_merge(merge_operation, after_merge_segment_entry);
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
Expand All @@ -566,7 +564,7 @@ impl SegmentUpdater {
.get_mergeable_segments(&merge_segment_ids)
}

async fn consider_merge_options(&self) {
fn consider_merge_options(&self) {
let (committed_segments, uncommitted_segments) = self.get_mergeable_segments();

// Committed segments cannot be merged with uncommitted_segments.
Expand Down Expand Up @@ -598,14 +596,14 @@ impl SegmentUpdater {
}
}

async fn end_merge(
fn end_merge(
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> crate::Result<SegmentMeta> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
self.schedule_task(async move {
self.schedule_task(move || {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
Expand Down Expand Up @@ -643,10 +641,10 @@ impl SegmentUpdater {
.save_metas(previous_metas.opstamp, previous_metas.payload.clone())?;
}

segment_updater.consider_merge_options().await;
segment_updater.consider_merge_options();
} // we drop all possible handle to a now useless `SegmentMeta`.

let _ = garbage_collect_files(segment_updater).await;
let _ = garbage_collect_files(segment_updater);
Ok(())
})
.wait()?;
Expand Down

0 comments on commit 10c6386

Please sign in to comment.