Skip to content

Commit

Permalink
Removes all usage of block_on, and use a oneshot channel instead. (#1315
Browse files Browse the repository at this point in the history
)

* Removes all usage of block_on, and use a oneshot channel instead.

Calling `block_on` panics in certain context.
For instance, it panics when it is called in a the context of another
call to block.

Using it in tantivy is unnecessary. We replace it by a thin wrapper
around a oneshot channel that supports both async/sync.

* Removing needless uses of async in the API.

Co-authored-by: PSeitz <[email protected]>
  • Loading branch information
fulmicoton and PSeitz authored Mar 18, 2022
1 parent d2a7bcf commit 46d5de9
Show file tree
Hide file tree
Showing 20 changed files with 273 additions and 188 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"]
edition = "2018"

[dependencies]
oneshot = "0.1"
base64 = "0.13"
byteorder = "1.4.3"
crc32fast = "1.2.1"
Expand All @@ -32,7 +33,6 @@ fs2={ version = "0.4.3", optional = true }
levenshtein_automata = "0.2"
uuid = { version = "0.8.2", features = ["v4", "serde"] }
crossbeam = "0.8.1"
futures = { version = "0.3.15", features = ["thread-pool"] }
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
Expand Down Expand Up @@ -71,6 +71,7 @@ criterion = "0.3.5"
test-log = "0.2.8"
env_logger = "0.9.0"
pprof = {version= "0.7", features=["flamegraph", "criterion"]}
futures = "0.3.15"

[dev-dependencies.fail]
version = "0.5"
Expand Down
8 changes: 3 additions & 5 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {

#[cfg(test)]
mod tests {

use futures::executor::block_on;
use serde_json::Value;

use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
Expand Down Expand Up @@ -378,7 +376,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -597,7 +595,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -1033,7 +1031,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down
12 changes: 6 additions & 6 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,24 +781,24 @@ mod tests {
for i in 0u64..8_000u64 {
writer.add_document(doc!(field => i))?;
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));

writer.commit()?;
let mem_right_after_commit = directory.total_mem_usage();
assert!(receiver.recv().is_ok());

let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;

assert_eq!(reader.searcher().num_docs(), 8_000);
assert_eq!(reader.searcher().segment_readers().len(), 8);

writer.wait_merging_threads()?;

let mem_right_after_merge_finished = directory.total_mem_usage();

reader.reload().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 8_000);
assert!(
mem_right_after_merge_finished < mem_right_after_commit,
Expand Down
4 changes: 3 additions & 1 deletion src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ impl FileWatcher {
if metafile_has_changed {
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
// We actually ignore callbacks failing here.
// We just wait for the end of their execution.
let _ = callbacks.broadcast().wait();
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::oneshot;
use futures::executor::block_on;

use super::*;

#[cfg(feature = "mmap")]
Expand Down Expand Up @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) {
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
let _just_sync = receiver.recv();
// explicitely dropping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
Expand All @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
});
assert!(block_on(receiver2).is_ok());
assert!(receiver2.recv().is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}
53 changes: 19 additions & 34 deletions src/directory/watch_event_router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::{Arc, RwLock, Weak};

use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use crate::FutureResult;

/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
Expand Down Expand Up @@ -74,12 +73,11 @@ impl WatchCallbackList {
}

/// Triggers all callbacks
pub fn broadcast(&self) -> impl Future<Output = ()> {
pub fn broadcast(&self) -> FutureResult<()> {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
let (result, sender) = FutureResult::create("One of the callback panicked.");
if callbacks.is_empty() {
let _ = sender.send(());
let _ = sender.send(Ok(()));
return result;
}
let spawn_res = std::thread::Builder::new()
Expand All @@ -88,7 +86,7 @@ impl WatchCallbackList {
for callback in callbacks {
callback.call();
}
let _ = sender.send(());
let _ = sender.send(Ok(()));
});
if let Err(err) = spawn_res {
error!(
Expand All @@ -106,8 +104,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::executor::block_on;

use crate::directory::{WatchCallback, WatchCallbackList};

#[test]
Expand All @@ -118,22 +114,18 @@ mod tests {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(1, counter.load(Ordering::SeqCst));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
}

Expand All @@ -150,19 +142,15 @@ mod tests {
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
}

Expand All @@ -176,15 +164,12 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}
3 changes: 1 addition & 2 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ mod tests {
.map(SegmentReader::segment_id)
.collect();
assert_eq!(segment_ids.len(), 2);
let merge_future = index_writer.merge(&segment_ids[..]);
futures::executor::block_on(merge_future)?;
index_writer.merge(&segment_ids[..]).wait().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter;
mod tests {

use chrono::Duration;
use futures::executor::block_on;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use test_log::test;
Expand Down Expand Up @@ -268,7 +267,7 @@ mod tests {
IndexingOp::Merge => {
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.segment_updater().wait_merging_thread()?;
}
}
Expand All @@ -283,7 +282,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if !segment_ids.is_empty() {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait()?;
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
Loading

0 comments on commit 46d5de9

Please sign in to comment.