diff --git a/Cargo.toml b/Cargo.toml index 8a8b84f65e..9fa9b91d5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,16 @@ rust-version = "1.63" exclude = ["benches/*.json", "benches/*.txt"] [dependencies] -oneshot = "0.1.5" +# Switch back to the non-forked oneshot crate once https://github.com/faern/oneshot/pull/35 is merged +oneshot = { git = "https://github.com/fulmicoton/oneshot.git", rev = "c10a3ba" } base64 = "0.22.0" byteorder = "1.4.3" crc32fast = "1.3.2" once_cell = "1.10.0" -regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] } +regex = { version = "1.5.5", default-features = false, features = [ + "std", + "unicode", +] } aho-corasick = "1.0" tantivy-fst = "0.5" memmap2 = { version = "0.9.0", optional = true } @@ -36,7 +40,9 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] } crossbeam-channel = "0.5.4" rust-stemmers = "1.2.0" downcast-rs = "1.2.0" -bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker4x"] } +bitpacking = { version = "0.9.2", default-features = false, features = [ + "bitpacker4x", +] } census = "0.4.2" rustc-hash = "1.1.0" thiserror = "1.0.30" @@ -51,13 +57,13 @@ itertools = "0.12.0" measure_time = "0.8.2" arc-swap = "1.5.0" -columnar = { version= "0.3", path="./columnar", package ="tantivy-columnar" } -sstable = { version= "0.3", path="./sstable", package ="tantivy-sstable", optional = true } -stacker = { version= "0.3", path="./stacker", package ="tantivy-stacker" } -query-grammar = { version= "0.22.0", path="./query-grammar", package = "tantivy-query-grammar" } -tantivy-bitpacker = { version= "0.6", path="./bitpacker" } -common = { version= "0.7", path = "./common/", package = "tantivy-common" } -tokenizer-api = { version= "0.3", path="./tokenizer-api", package="tantivy-tokenizer-api" } +columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" } +sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true } +stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" } +query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" } +tantivy-bitpacker = { version = "0.6", path = "./bitpacker" } +common = { version = "0.7", path = "./common/", package = "tantivy-common" } +tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.2.1", features = ["use_serde"] } futures-util = { version = "0.3.28", optional = true } fnv = "1.0.7" @@ -66,7 +72,7 @@ fnv = "1.0.7" winapi = "0.3.9" [dev-dependencies] -binggan = "0.5.1" +binggan = "0.5.2" rand = "0.8.5" maplit = "1.0.2" matches = "0.1.9" @@ -112,7 +118,7 @@ lz4-compression = ["lz4_flex"] zstd-compression = ["zstd"] failpoints = ["fail", "fail/failpoints"] -unstable = [] # useful for benches. +unstable = [] # useful for benches. quickwit = ["sstable", "futures-util"] @@ -122,7 +128,16 @@ quickwit = ["sstable", "futures-util"] compare_hash_only = ["stacker/compare_hash_only"] [workspace] -members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"] +members = [ + "query-grammar", + "bitpacker", + "common", + "ownedbytes", + "stacker", + "sstable", + "tokenizer-api", + "columnar", +] # Following the "fail" crate best practises, we isolate # tests that define specific behavior in fail check points @@ -147,4 +162,3 @@ harness = false [[bench]] name = "agg_bench" harness = false - diff --git a/src/core/executor.rs b/src/core/executor.rs index 915534009c..977dd97172 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,6 +1,7 @@ +use std::sync::Arc; + #[cfg(feature = "quickwit")] use futures_util::{future::Either, FutureExt}; -use rayon::{ThreadPool, ThreadPoolBuilder}; use crate::TantivyError; @@ -11,11 +12,18 @@ use crate::TantivyError; /// First dependency hell. It is not a good idea to expose the /// API of a dependency, knowing it might conflict with a different version /// used by the client. Second, we may stop using rayon in the future. +#[derive(Clone)] pub enum Executor { /// Single thread variant of an Executor SingleThread, /// Thread pool variant of an Executor - ThreadPool(ThreadPool), + ThreadPool(Arc), +} + +impl From> for Executor { + fn from(thread_pool: Arc) -> Self { + Executor::ThreadPool(thread_pool) + } } impl Executor { @@ -26,11 +34,11 @@ impl Executor { /// Creates an Executor that dispatches the tasks in a thread pool. pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result { - let pool = ThreadPoolBuilder::new() + let pool = rayon::ThreadPoolBuilder::new() .num_threads(num_threads) .thread_name(move |num| format!("{prefix}{num}")) .build()?; - Ok(Executor::ThreadPool(pool)) + Ok(Executor::ThreadPool(Arc::new(pool))) } /// Perform a map in the thread pool. @@ -105,7 +113,7 @@ impl Executor { match self { Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))), Executor::ThreadPool(pool) => { - let (sender, receiver) = oneshot_with_sentinel::channel(); + let (sender, receiver) = oneshot::channel(); pool.spawn(|| { if sender.is_closed() { return; @@ -121,54 +129,6 @@ impl Executor { } } -#[cfg(feature = "quickwit")] -mod oneshot_with_sentinel { - use std::pin::Pin; - use std::sync::Arc; - use std::task::{Context, Poll}; - // TODO get ride of this if oneshot ever gains a is_closed() - - pub struct SenderWithSentinel { - tx: oneshot::Sender, - guard: Arc<()>, - } - - pub struct ReceiverWithSentinel { - rx: oneshot::Receiver, - _guard: Arc<()>, - } - - pub fn channel() -> (SenderWithSentinel, ReceiverWithSentinel) { - let (tx, rx) = oneshot::channel(); - let guard = Arc::new(()); - ( - SenderWithSentinel { - tx, - guard: guard.clone(), - }, - ReceiverWithSentinel { rx, _guard: guard }, - ) - } - - impl SenderWithSentinel { - pub fn send(self, message: T) -> Result<(), oneshot::SendError> { - self.tx.send(message) - } - - pub fn is_closed(&self) -> bool { - Arc::strong_count(&self.guard) == 1 - } - } - - impl std::future::Future for ReceiverWithSentinel { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.rx).poll(ctx) - } - } -} - #[cfg(test)] mod tests { use super::Executor; diff --git a/src/index/index.rs b/src/index/index.rs index 0baea33451..400e732ff9 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -3,7 +3,6 @@ use std::fmt; #[cfg(feature = "mmap")] use std::path::Path; use std::path::PathBuf; -use std::sync::Arc; use std::thread::available_parallelism; use super::segment::Segment; @@ -294,7 +293,7 @@ pub struct Index { directory: ManagedDirectory, schema: Schema, settings: IndexSettings, - executor: Arc, + executor: Executor, tokenizers: TokenizerManager, fast_field_tokenizers: TokenizerManager, inventory: SegmentMetaInventory, @@ -319,23 +318,19 @@ impl Index { /// /// By default the executor is single thread, and simply runs in the calling thread. pub fn search_executor(&self) -> &Executor { - self.executor.as_ref() + &self.executor } /// Replace the default single thread search executor pool /// by a thread pool with a given number of threads. pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> { - self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?); + self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?; Ok(()) } - /// Custom thread pool by a outer thread pool. - pub fn set_shared_multithread_executor( - &mut self, - shared_thread_pool: Arc, - ) -> crate::Result<()> { - self.executor = shared_thread_pool.clone(); - Ok(()) + /// Custom thread poolby a outer thread pool. + pub fn set_executor(&mut self, executor: Executor) { + self.executor = executor; } /// Replace the default single thread search executor pool @@ -419,7 +414,7 @@ impl Index { schema, tokenizers: TokenizerManager::default(), fast_field_tokenizers: TokenizerManager::default(), - executor: Arc::new(Executor::single_thread()), + executor: Executor::single_thread(), inventory, } } diff --git a/src/lib.rs b/src/lib.rs index 2978f41780..ae984898d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -166,7 +166,7 @@ mod future_result; // Re-exports pub use common::DateTime; -pub use {columnar, query_grammar, time}; +pub use {columnar, query_grammar, rayon, time}; pub use crate::error::TantivyError; pub use crate::future_result::FutureResult; @@ -436,7 +436,6 @@ pub mod tests { } #[test] - #[cfg(not(feature = "lz4"))] fn test_version_string() { use regex::Regex; let regex_ptn = Regex::new(