Skip to content

Commit

Permalink
Fix async connection leak and co (#72)
Browse files Browse the repository at this point in the history
* - Fix connection leak on async connection when there is connection down
- Fix compilation warnings "warning: elided lifetime has a name"
- Update version in readme

* Fix compilation warnings

* Fix indentation
  • Loading branch information
barakb authored Dec 4, 2024
1 parent 96d57d7 commit c79d7a7
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
42 changes: 25 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Just add it to your `Cargo.toml`, like so:

```toml
falkordb = { version = "0.1.8" }
falkordb = { version = "0.1.9" }
```

### Run FalkorDB instance
Expand All @@ -39,22 +39,25 @@ use falkordb::{FalkorClientBuilder, FalkorConnectionInfo};

// Connect to FalkorDB
let connection_info: FalkorConnectionInfo = "falkor://127.0.0.1:6379".try_into()
.expect("Invalid connection info");
.expect("Invalid connection info");

let client = FalkorClientBuilder::new()
.with_connection_info(connection_info)
.build().expect("Failed to build client");
.with_connection_info(connection_info)
.build()
.expect("Failed to build client");

// Select the social graph
let mut graph = client.select_graph("social");

// Create 100 nodes and return a handful
let nodes = graph.query("UNWIND range(0, 100) AS i CREATE (n { v:1 }) RETURN n LIMIT 10")
.with_timeout(5000).execute().expect("Failed executing query");
.with_timeout(5000)
.execute()
.expect("Failed executing query");

// Can also be collected, like any other iterator
while let Some(node) = nodes.data.next() {
println ! ("{:?}", node);
println ! ("{:?}", node);
}
```

Expand All @@ -66,7 +69,7 @@ This client supports nonblocking API using the [`tokio`](https://tokio.rs/) runt
It can be enabled like so:

```toml
falkordb = { version = "0.1.8", features = ["tokio"] }
falkordb = { version = "0.1.9", features = ["tokio"] }
```

Currently, this API requires running within a [
Expand All @@ -80,22 +83,27 @@ use falkordb::{FalkorClientBuilder, FalkorConnectionInfo};

// Connect to FalkorDB
let connection_info: FalkorConnectionInfo = "falkor://127.0.0.1:6379".try_into()
.expect("Invalid connection info");
.expect("Invalid connection info");

let client = FalkorClientBuilder::new_async()
.with_connection_info(connection_info)
.build().await.expect("Failed to build client");
.with_connection_info(connection_info)
.build()
.await
.expect("Failed to build client");

// Select the social graph
let mut graph = client.select_graph("social");

// Create 100 nodes and return a handful
let nodes = graph.query("UNWIND range(0, 100) AS i CREATE (n { v:1 }) RETURN n LIMIT 10")
.with_timeout(5000).execute().await.expect("Failed executing query");
.with_timeout(5000)
.execute()
.await
.expect("Failed executing query");

// Graph operations are asynchronous, but parsing is still concurrent:
while let Some(node) = nodes.data.next() {
println ! ("{:?}", node);
println ! ("{:?}", node);
}
```

Expand All @@ -115,21 +123,21 @@ when using tokio: `"tokio-rustls"`/`"tokio-native-tls"`).
For Rustls:

```toml
falkordb = { version = "0.1.8", features = ["rustls"] }
falkordb = { version = "0.1.9", features = ["rustls"] }
```

```toml
falkordb = { version = "0.1.8", features = ["tokio-rustls"] }
falkordb = { version = "0.1.9", features = ["tokio-rustls"] }
```

For Native TLS:

```toml
falkordb = { version = "0.1.8", features = ["native-tls"] }
falkordb = { version = "0.1.9", features = ["native-tls"] }
```

```toml
falkordb = { version = "0.1.8", features = ["tokio-native-tls"] }
falkordb = { version = "0.1.9", features = ["tokio-native-tls"] }
```

### Tracing
Expand All @@ -138,7 +146,7 @@ This crate fully supports instrumentation using the [`tracing`](https://docs.rs/
it, simply, enable the `tracing` feature:

```toml
falkordb = { version = "0.1.8", features = ["tracing"] }
falkordb = { version = "0.1.9", features = ["tracing"] }
```

Note that different functions use different filtration levels, to avoid spamming your tests, be sure to enable the
Expand Down
1 change: 1 addition & 0 deletions src/connection/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl BorrowedAsyncConnection {
Err(FalkorDBError::ConnectionDown) => {
if let Ok(new_conn) = self.client.get_async_connection().await {
self.conn = Some(new_conn);
tokio::spawn(async { self.return_to_pool().await });
return Err(FalkorDBError::ConnectionDown);
}
Err(FalkorDBError::NoConnection)
Expand Down
10 changes: 5 additions & 5 deletions src/graph/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl AsyncGraph {
pub fn profile<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string)
}

Expand All @@ -131,7 +131,7 @@ impl AsyncGraph {
pub fn explain<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string)
}

Expand Down Expand Up @@ -162,7 +162,7 @@ impl AsyncGraph {
pub fn ro_query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<QueryResult<LazyResultSet>, &str, Self> {
) -> QueryBuilder<'a, QueryResult<LazyResultSet<'a>>, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string)
}

Expand All @@ -178,7 +178,7 @@ impl AsyncGraph {
pub fn call_procedure<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new(self, procedure_name)
}

Expand All @@ -194,7 +194,7 @@ impl AsyncGraph {
pub fn call_procedure_ro<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new_readonly(self, procedure_name)
}

Expand Down
10 changes: 5 additions & 5 deletions src/graph/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl SyncGraph {
pub fn profile<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string)
}

Expand All @@ -125,7 +125,7 @@ impl SyncGraph {
pub fn explain<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string)
}

Expand Down Expand Up @@ -156,7 +156,7 @@ impl SyncGraph {
pub fn ro_query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<QueryResult<LazyResultSet>, &str, Self> {
) -> QueryBuilder<'a, QueryResult<LazyResultSet<'a>>, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string)
}

Expand All @@ -172,7 +172,7 @@ impl SyncGraph {
pub fn call_procedure<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new(self, procedure_name)
}

Expand All @@ -188,7 +188,7 @@ impl SyncGraph {
pub fn call_procedure_ro<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new_readonly(self, procedure_name)
}

Expand Down

0 comments on commit c79d7a7

Please sign in to comment.