Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avert Parsing Latency By Rewriting Parser #22

Merged
merged 11 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ license = "SSPL-1.0"
[dependencies]
log = { version = "0.4.21", default-features = false, features = ["std"] }
parking_lot = { version = "0.12.3", default-features = false, features = ["deadlock_detection"] }
redis = { version = "0.25.4", default-features = false, optional = true, features = ["sentinel"] }
redis = { version = "0.25.4", default-features = false, features = ["sentinel"] }
regex = { version = "1.10.5", default-features = false, features = ["std", "perf", "unicode-bool", "unicode-perl"] }
strum = { version = "0.26.2", default-features = false, features = ["std", "derive"] }
thiserror = "1.0.61"
tracing = { version = "0.1.40", default-features = false, features = ["std", "attributes"] }
tracing = { version = "0.1.40", default-features = false, features = ["std", "attributes"], optional = true }

[features]
default = ["redis"]
default = []

native-tls = ["redis/tls-native-tls"]
rustls = ["redis/tls-rustls"]
redis = ["dep:redis"]

tracing = ["dep:tracing"]
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@

[![Try Free](https://img.shields.io/badge/Try%20Free-FalkorDB%20Cloud-FF8101?labelColor=FDE900&style=for-the-badge&link=https://app.falkordb.cloud)](https://app.falkordb.cloud)

FalkorDB Rust client
### FalkorDB Rust client

## Usage

### Installation

Just add it to your `Cargo.toml`, like so
Just add it to your `Cargo.toml`, like so:

```toml
falkordb = { version = "0.1.0" }
falkordb = { version = "0.1" }
```

### Run FalkorDB instance
Expand Down Expand Up @@ -55,3 +55,33 @@ for n in nodes.data {
println!("{:?}", n[0]);
}
```

## Features

### SSL/TLS Support

This client is currently built upon the [`redis`](https://docs.rs/redis/latest/redis/) crate, and therefore supports TLS using
its implementation, which uses either [`rustls`](https://docs.rs/rustls/latest/rustls/) or [`native_tls`](https://docs.rs/native-tls/latest/native_tls/).
This is not enabled by default, and the user ust opt-in by enabling the respective features: `"rustls"`/`"native-tls"`.\

For Rustls:

```toml
falkordb = { version = "0.1", features = ["rustls"] }
```

For NativeTLS:

```toml
falkordb = { version = "0.1", features = ["native-tls"] }
```

### Tracing

This crate fully supports instrumentation using the [`tracing`](https://docs.rs/tracing/latest/tracing/) crate, to use it, simply, enable the `tracing` feature:

```toml
falkordb = { version = "0.1", features = ["tracing"] }
```

Note that different functions use different filtration levels, to avoid spamming your tests, be sure to enable the correct level as you desire it.
110 changes: 80 additions & 30 deletions src/client/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
use crate::{
client::FalkorClientProvider,
connection::blocking::{BorrowedSyncConnection, FalkorSyncConnection},
parser::utils::string_vec_from_val,
ConfigValue, FalkorConnectionInfo, FalkorDBError, FalkorResult, FalkorValue, SyncGraph,
parser::{redis_value_as_string, redis_value_as_untyped_string_vec, redis_value_as_vec},
ConfigValue, FalkorConnectionInfo, FalkorDBError, FalkorResult, SyncGraph,
};
use parking_lot::{Mutex, RwLock};
use std::{
Expand All @@ -24,6 +24,14 @@ pub(crate) struct FalkorSyncClientInner {
}

impl FalkorSyncClientInner {
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "Borrow Connection From Connection Pool",
skip_all,
level = "debug"
)
)]
pub(crate) fn borrow_connection(
&self,
pool_owner: Arc<Self>,
Expand All @@ -38,12 +46,22 @@ impl FalkorSyncClientInner {
))
}

#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "Get New Sync Connection From Client",
skip_all,
level = "info"
)
)]
pub(crate) fn get_connection(&self) -> FalkorResult<FalkorSyncConnection> {
self._inner.lock().get_connection()
}
}

#[cfg(feature = "redis")]
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Check Is Sentinel", skip_all, level = "info")
)]
fn is_sentinel(conn: &mut FalkorSyncConnection) -> FalkorResult<bool> {
let info_map = conn.get_redis_info(Some("server"))?;
Ok(info_map
Expand All @@ -52,7 +70,10 @@ fn is_sentinel(conn: &mut FalkorSyncConnection) -> FalkorResult<bool> {
.unwrap_or_default())
}

#[cfg(feature = "redis")]
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Get Sentinel Client", skip_all, level = "info")
)]
pub(crate) fn get_sentinel_client(
client: &mut FalkorClientProvider,
connection_info: &redis::ConnectionInfo,
Expand All @@ -65,8 +86,8 @@ pub(crate) fn get_sentinel_client(
// This could have been so simple using the Sentinel API, but it requires a service name
// Perhaps in the future we can use it if we only support the master instance to be called 'master'?
let sentinel_masters = conn
.execute_command(None, "SENTINEL", Some("MASTERS"), None)?
.into_vec()?;
.execute_command(None, "SENTINEL", Some("MASTERS"), None)
.and_then(redis_value_as_vec)?;

if sentinel_masters.len() != 1 {
return Err(FalkorDBError::SentinelMastersCount);
Expand All @@ -75,12 +96,13 @@ pub(crate) fn get_sentinel_client(
let sentinel_master: HashMap<_, _> = sentinel_masters
.into_iter()
.next()
.and_then(|master| master.into_sequence().ok())
.ok_or(FalkorDBError::SentinelMastersCount)?
.into_vec()?
.chunks_exact(2)
.flat_map(|chunk| TryInto::<[FalkorValue; 2]>::try_into(chunk.to_vec()))
.flat_map(TryInto::<&[redis::Value; 2]>::try_into) // TODO: In the future, check if this can be done with no copying, but this should be a rare function call tbh
.flat_map(|[key, val]| {
Result::<_, FalkorDBError>::Ok((key.into_string()?, val.into_string()?))
redis_value_as_string(key.to_owned())
.and_then(|key| redis_value_as_string(val.to_owned()).map(|val| (key, val)))
})
.collect();

Expand Down Expand Up @@ -124,6 +146,10 @@ pub struct FalkorSyncClient {
}

impl FalkorSyncClient {
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Create Sync Client", skip_all, level = "info")
)]
pub(crate) fn create(
mut client: FalkorClientProvider,
connection_info: FalkorConnectionInfo,
Expand Down Expand Up @@ -166,10 +192,14 @@ impl FalkorSyncClient {
///
/// # Returns
/// A [`Vec`] of [`String`]s, containing the names of available graphs
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "List Graphs", skip_all, level = "info")
)]
pub fn list_graphs(&self) -> FalkorResult<Vec<String>> {
let mut conn = self.borrow_connection()?;
conn.execute_command(None, "GRAPH.LIST", None, None)
.and_then(string_vec_from_val)
.and_then(redis_value_as_untyped_string_vec)
}

/// Return the current value of a configuration option in the database.
Expand All @@ -180,38 +210,47 @@ impl FalkorSyncClient {
///
/// # Returns
/// A [`HashMap`] comprised of [`String`] keys, and [`ConfigValue`] values.
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Get Config Value", skip_all, level = "info")
)]
pub fn config_get(
&self,
config_key: &str,
) -> FalkorResult<HashMap<String, ConfigValue>> {
let mut conn = self.borrow_connection()?;
let config = conn
.execute_command(None, "GRAPH.CONFIG", Some("GET"), Some(&[config_key]))?
.into_vec()?;
let config = self
.borrow_connection()
.and_then(|mut conn| {
conn.execute_command(None, "GRAPH.CONFIG", Some("GET"), Some(&[config_key]))
})
.and_then(redis_value_as_vec)?;

if config.len() == 2 {
let [key, val]: [FalkorValue; 2] = config.try_into().map_err(|_| {
let [key, val]: [redis::Value; 2] = config.try_into().map_err(|_| {
FalkorDBError::ParsingArrayToStructElementCount(
"Expected exactly 2 elements for configuration option".to_string(),
"Expected exactly 2 elements for configuration option",
)
})?;

return Ok(HashMap::from([(
key.into_string()?,
ConfigValue::try_from(val)?,
)]));
return redis_value_as_string(key)
.and_then(|key| ConfigValue::try_from(val).map(|val| HashMap::from([(key, val)])));
}

Ok(config
.into_iter()
.flat_map(|config| {
let [key, val]: [FalkorValue; 2] = config.into_vec()?.try_into().map_err(|_| {
FalkorDBError::ParsingArrayToStructElementCount(
"Expected exactly 2 elements for configuration option".to_string(),
)
})?;

Result::<_, FalkorDBError>::Ok((key.into_string()?, ConfigValue::try_from(val)?))
redis_value_as_vec(config).and_then(|as_vec| {
let [key, val]: [redis::Value; 2] = as_vec.try_into().map_err(|_| {
FalkorDBError::ParsingArrayToStructElementCount(
"Expected exactly 2 elements for configuration option",
)
})?;

Result::<_, FalkorDBError>::Ok((
redis_value_as_string(key)?,
ConfigValue::try_from(val)?,
))
})
})
.collect::<HashMap<String, ConfigValue>>())
}
Expand All @@ -222,11 +261,15 @@ impl FalkorSyncClient {
/// * `config_Key`: A [`String`] representation of a configuration's key.
/// The config key can also be "*", which will return ALL the configuration options.
/// * `value`: The new value to set, which is anything that can be converted into a [`ConfigValue`], namely string types and i64.
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Set Config Value", skip_all, level = "info")
)]
pub fn config_set<C: Into<ConfigValue>>(
&self,
config_key: &str,
value: C,
) -> FalkorResult<FalkorValue> {
) -> FalkorResult<redis::Value> {
self.borrow_connection()?.execute_command(
None,
"GRAPH.CONFIG",
Expand Down Expand Up @@ -257,6 +300,10 @@ impl FalkorSyncClient {
///
/// # Returns
/// If successful, will return the new [`SyncGraph`] object.
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Copy Graph", skip_all, level = "info")
)]
pub fn copy_graph(
&self,
graph_to_clone: &str,
Expand All @@ -271,8 +318,11 @@ impl FalkorSyncClient {
Ok(self.select_graph(new_graph_name))
}

#[cfg(feature = "redis")]
/// Retrieves redis information
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Client Get Redis Info", skip_all, level = "info")
)]
pub fn redis_info(
&self,
section: Option<&str>,
Expand Down
4 changes: 1 addition & 3 deletions src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl<const R: char> FalkorClientBuilder<R> {
.try_into()
.map_err(|err| FalkorDBError::InvalidConnectionInfo(err.to_string()))?;
Ok(match connection_info {
#[cfg(feature = "redis")]
FalkorConnectionInfo::Redis(connection_info) => FalkorClientProvider::Redis {
client: redis::Client::open(connection_info.clone())
.map_err(|err| FalkorDBError::RedisError(err.to_string()))?,
Expand Down Expand Up @@ -92,7 +91,6 @@ impl FalkorClientBuilder<'S'> {

let mut client = Self::get_client(connection_info.clone())?;

#[cfg(feature = "redis")]
#[allow(irrefutable_let_patterns)]
if let FalkorConnectionInfo::Redis(redis_conn_info) = &connection_info {
if let Some(sentinel) =
Expand Down Expand Up @@ -121,7 +119,7 @@ mod tests {
}

#[test]
#[cfg(feature = "redis")]

fn test_sync_builder_redis_fallback() {
let client = FalkorClientBuilder::new().build();
assert!(client.is_ok());
Expand Down
6 changes: 2 additions & 4 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub(crate) mod builder;
pub(crate) enum FalkorClientProvider {
#[allow(unused)]
None,
#[cfg(feature = "redis")]

Redis {
client: redis::Client,
sentinel: Option<redis::sentinel::SentinelClient>,
Expand All @@ -22,7 +22,6 @@ pub(crate) enum FalkorClientProvider {
impl FalkorClientProvider {
pub(crate) fn get_connection(&mut self) -> FalkorResult<FalkorSyncConnection> {
Ok(match self {
#[cfg(feature = "redis")]
FalkorClientProvider::Redis {
sentinel: Some(sentinel),
..
Expand All @@ -31,7 +30,7 @@ impl FalkorClientProvider {
.get_connection()
.map_err(|err| FalkorDBError::RedisError(err.to_string()))?,
),
#[cfg(feature = "redis")]

FalkorClientProvider::Redis { client, .. } => FalkorSyncConnection::Redis(
client
.get_connection()
Expand All @@ -41,7 +40,6 @@ impl FalkorClientProvider {
})
}

#[cfg(feature = "redis")]
pub(crate) fn set_sentinel(
&mut self,
sentinel_client: redis::sentinel::SentinelClient,
Expand Down
Loading