Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async connection leak and co #72

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Just add it to your `Cargo.toml`, like so:

```toml
falkordb = { version = "0.1.8" }
falkordb = { version = "0.1.9" }
```

### Run FalkorDB instance
Expand All @@ -39,22 +39,25 @@ use falkordb::{FalkorClientBuilder, FalkorConnectionInfo};

// Connect to FalkorDB
let connection_info: FalkorConnectionInfo = "falkor://127.0.0.1:6379".try_into()
.expect("Invalid connection info");
.expect("Invalid connection info");

let client = FalkorClientBuilder::new()
.with_connection_info(connection_info)
.build().expect("Failed to build client");
.with_connection_info(connection_info)
.build()
.expect("Failed to build client");

// Select the social graph
let mut graph = client.select_graph("social");

// Create 100 nodes and return a handful
let nodes = graph.query("UNWIND range(0, 100) AS i CREATE (n { v:1 }) RETURN n LIMIT 10")
.with_timeout(5000).execute().expect("Failed executing query");
.with_timeout(5000)
.execute()
.expect("Failed executing query");

// Can also be collected, like any other iterator
while let Some(node) = nodes.data.next() {
println ! ("{:?}", node);
println ! ("{:?}", node);
}
```

Expand All @@ -66,7 +69,7 @@ This client supports nonblocking API using the [`tokio`](https://tokio.rs/) runt
It can be enabled like so:

```toml
falkordb = { version = "0.1.8", features = ["tokio"] }
falkordb = { version = "0.1.9", features = ["tokio"] }
```

Currently, this API requires running within a [
Expand All @@ -80,22 +83,27 @@ use falkordb::{FalkorClientBuilder, FalkorConnectionInfo};

// Connect to FalkorDB
let connection_info: FalkorConnectionInfo = "falkor://127.0.0.1:6379".try_into()
.expect("Invalid connection info");
.expect("Invalid connection info");

let client = FalkorClientBuilder::new_async()
.with_connection_info(connection_info)
.build().await.expect("Failed to build client");
.with_connection_info(connection_info)
.build()
.await
.expect("Failed to build client");

// Select the social graph
let mut graph = client.select_graph("social");

// Create 100 nodes and return a handful
let nodes = graph.query("UNWIND range(0, 100) AS i CREATE (n { v:1 }) RETURN n LIMIT 10")
.with_timeout(5000).execute().await.expect("Failed executing query");
.with_timeout(5000)
.execute()
.await
.expect("Failed executing query");

// Graph operations are asynchronous, but parsing is still concurrent:
while let Some(node) = nodes.data.next() {
println ! ("{:?}", node);
println ! ("{:?}", node);
}
```

Expand All @@ -115,21 +123,21 @@ when using tokio: `"tokio-rustls"`/`"tokio-native-tls"`).
For Rustls:

```toml
falkordb = { version = "0.1.8", features = ["rustls"] }
falkordb = { version = "0.1.9", features = ["rustls"] }
```

```toml
falkordb = { version = "0.1.8", features = ["tokio-rustls"] }
falkordb = { version = "0.1.9", features = ["tokio-rustls"] }
```

For Native TLS:

```toml
falkordb = { version = "0.1.8", features = ["native-tls"] }
falkordb = { version = "0.1.9", features = ["native-tls"] }
```

```toml
falkordb = { version = "0.1.8", features = ["tokio-native-tls"] }
falkordb = { version = "0.1.9", features = ["tokio-native-tls"] }
```

### Tracing
Expand All @@ -138,7 +146,7 @@ This crate fully supports instrumentation using the [`tracing`](https://docs.rs/
it, simply, enable the `tracing` feature:

```toml
falkordb = { version = "0.1.8", features = ["tracing"] }
falkordb = { version = "0.1.9", features = ["tracing"] }
```

Note that different functions use different filtration levels, to avoid spamming your tests, be sure to enable the
Expand Down
1 change: 1 addition & 0 deletions src/connection/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl BorrowedAsyncConnection {
Err(FalkorDBError::ConnectionDown) => {
if let Ok(new_conn) = self.client.get_async_connection().await {
self.conn = Some(new_conn);
tokio::spawn(async { self.return_to_pool().await });
return Err(FalkorDBError::ConnectionDown);
}
Err(FalkorDBError::NoConnection)
Expand Down
10 changes: 5 additions & 5 deletions src/graph/asynchronous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl AsyncGraph {
pub fn profile<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string)
}

Expand All @@ -131,7 +131,7 @@ impl AsyncGraph {
pub fn explain<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string)
}

Expand Down Expand Up @@ -162,7 +162,7 @@ impl AsyncGraph {
pub fn ro_query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<QueryResult<LazyResultSet>, &str, Self> {
) -> QueryBuilder<'a, QueryResult<LazyResultSet<'a>>, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string)
}

Expand All @@ -178,7 +178,7 @@ impl AsyncGraph {
pub fn call_procedure<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new(self, procedure_name)
}

Expand All @@ -194,7 +194,7 @@ impl AsyncGraph {
pub fn call_procedure_ro<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new_readonly(self, procedure_name)
}

Expand Down
10 changes: 5 additions & 5 deletions src/graph/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl SyncGraph {
pub fn profile<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string)
}

Expand All @@ -125,7 +125,7 @@ impl SyncGraph {
pub fn explain<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan, &str, Self> {
) -> QueryBuilder<'a, ExecutionPlan, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string)
}

Expand Down Expand Up @@ -156,7 +156,7 @@ impl SyncGraph {
pub fn ro_query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<QueryResult<LazyResultSet>, &str, Self> {
) -> QueryBuilder<'a, QueryResult<LazyResultSet<'a>>, &'a str, Self> {
QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string)
}

Expand All @@ -172,7 +172,7 @@ impl SyncGraph {
pub fn call_procedure<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new(self, procedure_name)
}

Expand All @@ -188,7 +188,7 @@ impl SyncGraph {
pub fn call_procedure_ro<'a, P>(
&'a mut self,
procedure_name: &'a str,
) -> ProcedureQueryBuilder<P, Self> {
) -> ProcedureQueryBuilder<'a, P, Self> {
ProcedureQueryBuilder::new_readonly(self, procedure_name)
}

Expand Down
Loading