Skip to content

Commit

Permalink
Small changes in the Executor API.
Browse files Browse the repository at this point in the history
Warning, this change is mildly not backward compatible
so I bumped tantivy's version.
  • Loading branch information
fulmicoton committed May 10, 2024
1 parent 1ee5f90 commit 8d05680
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 87 deletions.
44 changes: 29 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.22.0"
version = "0.23.0"
authors = ["Paul Masurel <[email protected]>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
Expand All @@ -15,12 +15,16 @@ rust-version = "1.63"
exclude = ["benches/*.json", "benches/*.txt"]

[dependencies]
oneshot = "0.1.5"
# Switch back to the non-forked oneshot crate once https://github.com/faern/oneshot/pull/35 is merged
oneshot = { git = "https://github.com/fulmicoton/oneshot.git", rev = "c10a3ba" }
base64 = "0.22.0"
byteorder = "1.4.3"
crc32fast = "1.3.2"
once_cell = "1.10.0"
regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] }
regex = { version = "1.5.5", default-features = false, features = [
"std",
"unicode",
] }
aho-corasick = "1.0"
tantivy-fst = "0.5"
memmap2 = { version = "0.9.0", optional = true }
Expand All @@ -36,7 +40,9 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker4x"] }
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
] }
census = "0.4.2"
rustc-hash = "1.1.0"
thiserror = "1.0.30"
Expand All @@ -51,13 +57,13 @@ itertools = "0.12.0"
measure_time = "0.8.2"
arc-swap = "1.5.0"

columnar = { version= "0.3", path="./columnar", package ="tantivy-columnar" }
sstable = { version= "0.3", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version= "0.3", path="./stacker", package ="tantivy-stacker" }
query-grammar = { version= "0.22.0", path="./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version= "0.6", path="./bitpacker" }
common = { version= "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version= "0.3", path="./tokenizer-api", package="tantivy-tokenizer-api" }
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.2.1", features = ["use_serde"] }
futures-util = { version = "0.3.28", optional = true }
fnv = "1.0.7"
Expand All @@ -66,7 +72,7 @@ fnv = "1.0.7"
winapi = "0.3.9"

[dev-dependencies]
binggan = "0.5.1"
binggan = "0.5.2"
rand = "0.8.5"
maplit = "1.0.2"
matches = "0.1.9"
Expand Down Expand Up @@ -112,7 +118,7 @@ lz4-compression = ["lz4_flex"]
zstd-compression = ["zstd"]

failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.
unstable = [] # useful for benches.

quickwit = ["sstable", "futures-util"]

Expand All @@ -122,7 +128,16 @@ quickwit = ["sstable", "futures-util"]
compare_hash_only = ["stacker/compare_hash_only"]

[workspace]
members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
members = [
"query-grammar",
"bitpacker",
"common",
"ownedbytes",
"stacker",
"sstable",
"tokenizer-api",
"columnar",
]

# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
Expand All @@ -147,4 +162,3 @@ harness = false
[[bench]]
name = "agg_bench"
harness = false

76 changes: 16 additions & 60 deletions src/core/executor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::sync::Arc;

#[cfg(feature = "quickwit")]
use futures_util::{future::Either, FutureExt};
use rayon::{ThreadPool, ThreadPoolBuilder};

use crate::TantivyError;

/// Search executor whether search request are single thread or multithread.
///
/// We don't expose Rayon thread pool directly here for several reasons.
///
/// First dependency hell. It is not a good idea to expose the
/// API of a dependency, knowing it might conflict with a different version
/// used by the client. Second, we may stop using rayon in the future.
/// Executor makes it possible to run tasks in single thread or
/// in a thread pool.
#[derive(Clone)]
pub enum Executor {
/// Single thread variant of an Executor
SingleThread,
/// Thread pool variant of an Executor
ThreadPool(ThreadPool),
ThreadPool(Arc<rayon::ThreadPool>),
}

#[cfg(feature = "quickwit")]
impl From<Arc<rayon::ThreadPool>> for Executor {
fn from(thread_pool: Arc<rayon::ThreadPool>) -> Self {
Executor::ThreadPool(thread_pool)
}
}

impl Executor {
Expand All @@ -26,11 +30,11 @@ impl Executor {

/// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
let pool = ThreadPoolBuilder::new()
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{prefix}{num}"))
.build()?;
Ok(Executor::ThreadPool(pool))
Ok(Executor::ThreadPool(Arc::new(pool)))
}

/// Perform a map in the thread pool.
Expand Down Expand Up @@ -105,7 +109,7 @@ impl Executor {
match self {
Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))),
Executor::ThreadPool(pool) => {
let (sender, receiver) = oneshot_with_sentinel::channel();
let (sender, receiver) = oneshot::channel();
pool.spawn(|| {
if sender.is_closed() {
return;
Expand All @@ -121,54 +125,6 @@ impl Executor {
}
}

#[cfg(feature = "quickwit")]
mod oneshot_with_sentinel {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
// TODO get ride of this if oneshot ever gains a is_closed()

pub struct SenderWithSentinel<T> {
tx: oneshot::Sender<T>,
guard: Arc<()>,
}

pub struct ReceiverWithSentinel<T> {
rx: oneshot::Receiver<T>,
_guard: Arc<()>,
}

pub fn channel<T>() -> (SenderWithSentinel<T>, ReceiverWithSentinel<T>) {
let (tx, rx) = oneshot::channel();
let guard = Arc::new(());
(
SenderWithSentinel {
tx,
guard: guard.clone(),
},
ReceiverWithSentinel { rx, _guard: guard },
)
}

impl<T> SenderWithSentinel<T> {
pub fn send(self, message: T) -> Result<(), oneshot::SendError<T>> {
self.tx.send(message)
}

pub fn is_closed(&self) -> bool {
Arc::strong_count(&self.guard) == 1
}
}

impl<T> std::future::Future for ReceiverWithSentinel<T> {
type Output = Result<T, oneshot::RecvError>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(ctx)
}
}
}

#[cfg(test)]
mod tests {
use super::Executor;
Expand Down
17 changes: 6 additions & 11 deletions src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::fmt;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::segment::Segment;
Expand Down Expand Up @@ -294,7 +293,7 @@ pub struct Index {
directory: ManagedDirectory,
schema: Schema,
settings: IndexSettings,
executor: Arc<Executor>,
executor: Executor,
tokenizers: TokenizerManager,
fast_field_tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
Expand All @@ -319,23 +318,19 @@ impl Index {
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
self.executor.as_ref()
&self.executor
}

/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?);
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}

/// Custom thread pool by a outer thread pool.
pub fn set_shared_multithread_executor(
&mut self,
shared_thread_pool: Arc<Executor>,
) -> crate::Result<()> {
self.executor = shared_thread_pool.clone();
Ok(())
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}

/// Replace the default single thread search executor pool
Expand Down Expand Up @@ -419,7 +414,7 @@ impl Index {
schema,
tokenizers: TokenizerManager::default(),
fast_field_tokenizers: TokenizerManager::default(),
executor: Arc::new(Executor::single_thread()),
executor: Executor::single_thread(),
inventory,
}
}
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ pub mod tests {
}

#[test]
#[cfg(not(feature = "lz4"))]
fn test_version_string() {
use regex::Regex;
let regex_ptn = Regex::new(
Expand Down

0 comments on commit 8d05680

Please sign in to comment.