From 2b6f242a223724eecb74bed7f34b3deeebcde4b5 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Wed, 24 Jun 2020 16:11:46 +0200 Subject: [PATCH 1/7] LRU statement cache for MySQL --- Cargo.lock | 16 ++++++ sqlx-core/Cargo.toml | 1 + sqlx-core/src/caching_connection.rs | 13 +++++ sqlx-core/src/common/mod.rs | 3 ++ sqlx-core/src/common/statement_cache.rs | 50 +++++++++++++++++++ sqlx-core/src/lib.rs | 2 + sqlx-core/src/mysql/connection/establish.rs | 4 +- sqlx-core/src/mysql/connection/executor.rs | 12 +++-- sqlx-core/src/mysql/connection/mod.rs | 21 +++++++- sqlx-core/src/mysql/options.rs | 17 +++++++ sqlx-core/src/mysql/protocol/statement/mod.rs | 2 + .../mysql/protocol/statement/stmt_close.rs | 16 ++++++ src/lib.rs | 1 + tests/mysql/mysql.rs | 24 ++++++++- 14 files changed, 173 insertions(+), 9 deletions(-) create mode 100644 sqlx-core/src/caching_connection.rs create mode 100644 sqlx-core/src/common/mod.rs create mode 100644 sqlx-core/src/common/statement_cache.rs create mode 100644 sqlx-core/src/mysql/protocol/statement/stmt_close.rs diff --git a/Cargo.lock b/Cargo.lock index 5c46410509..adb5504c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1205,6 +1205,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" + [[package]] name = "lock_api" version = "0.3.4" @@ -1234,6 +1240,15 @@ dependencies = [ "scoped-tls 0.1.2", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "maplit" version = "1.0.2" @@ -2281,6 +2296,7 @@ dependencies = [ "libc", "libsqlite3-sys", "log", + "lru-cache", "md-5", "memchr", "num-bigint", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 0a5a692f23..d821976362 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -84,3 +84,4 @@ url = { version = "2.1.1", default-features = false } uuid = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] } whoami = "0.8.1" stringprep = "0.1.2" +lru-cache = "0.1.2" diff --git a/sqlx-core/src/caching_connection.rs b/sqlx-core/src/caching_connection.rs new file mode 100644 index 0000000000..88482514b5 --- /dev/null +++ b/sqlx-core/src/caching_connection.rs @@ -0,0 +1,13 @@ +use futures_core::future::BoxFuture; + +use crate::error::Error; + +/// A connection that is capable of caching prepared statements. +pub trait CachingConnection: Send { + /// The number of statements currently cached in the connection. + fn cached_statements_count(&self) -> usize; + + /// Removes all statements from the cache, closing them on the server if + /// needed. + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>; +} diff --git a/sqlx-core/src/common/mod.rs b/sqlx-core/src/common/mod.rs new file mode 100644 index 0000000000..f9698f28c2 --- /dev/null +++ b/sqlx-core/src/common/mod.rs @@ -0,0 +1,3 @@ +mod statement_cache; + +pub(crate) use statement_cache::StatementCache; diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs new file mode 100644 index 0000000000..e87785eb09 --- /dev/null +++ b/sqlx-core/src/common/statement_cache.rs @@ -0,0 +1,50 @@ +use lru_cache::LruCache; + +/// A cache for prepared statements. When full, the least recently used +/// statement gets removed. +#[derive(Debug)] +pub struct StatementCache { + inner: LruCache, +} + +impl StatementCache { + /// Create a new cache with the given capacity. + pub fn new(capacity: usize) -> Self { + Self { + inner: LruCache::new(capacity), + } + } + + /// Returns a mutable reference to the value corresponding to the given key + /// in the cache, if any. + pub fn get_mut(&mut self, k: &str) -> Option<&mut u32> { + self.inner.get_mut(k) + } + + /// Inserts a new statement to the cache, returning the least recently used + /// statement id if the cache is full, or if inserting with an existing key, + /// the replaced existing statement. + pub fn insert(&mut self, k: &str, v: u32) -> Option { + let mut lru_item = None; + + if self.inner.capacity() == self.len() && !self.inner.contains_key(k) { + lru_item = self.remove_lru(); + } else if self.inner.contains_key(k) { + lru_item = self.inner.remove(k); + } + + self.inner.insert(k.into(), v); + + lru_item + } + + /// The number of statements in the cache. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Removes the least recently used item from the cache. + pub fn remove_lru(&mut self) -> Option { + self.inner.remove_lru().map(|(_, v)| v) + } +} diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index cda49dbfde..3785c2bde2 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -29,6 +29,7 @@ pub mod arguments; #[macro_use] pub mod pool; +pub mod caching_connection; pub mod connection; #[macro_use] @@ -37,6 +38,7 @@ pub mod transaction; #[macro_use] pub mod encode; +mod common; pub mod database; pub mod decode; pub mod describe; diff --git a/sqlx-core/src/mysql/connection/establish.rs b/sqlx-core/src/mysql/connection/establish.rs index 34f6bf6c2b..3174589444 100644 --- a/sqlx-core/src/mysql/connection/establish.rs +++ b/sqlx-core/src/mysql/connection/establish.rs @@ -1,6 +1,6 @@ use bytes::Bytes; -use hashbrown::HashMap; +use crate::common::StatementCache; use crate::error::Error; use crate::mysql::connection::{tls, MySqlStream, COLLATE_UTF8MB4_UNICODE_CI, MAX_PACKET_SIZE}; use crate::mysql::protocol::connect::{ @@ -98,7 +98,7 @@ impl MySqlConnection { Ok(Self { stream, - cache_statement: HashMap::new(), + cache_statement: StatementCache::new(options.statement_cache_size), scratch_row_columns: Default::default(), scratch_row_column_names: Default::default(), }) diff --git a/sqlx-core/src/mysql/connection/executor.rs b/sqlx-core/src/mysql/connection/executor.rs index 5110f3c3f5..7b28f36013 100644 --- a/sqlx-core/src/mysql/connection/executor.rs +++ b/sqlx-core/src/mysql/connection/executor.rs @@ -15,7 +15,7 @@ use crate::mysql::connection::stream::Busy; use crate::mysql::io::MySqlBufExt; use crate::mysql::protocol::response::Status; use crate::mysql::protocol::statement::{ - BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, + BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose, }; use crate::mysql::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow}; use crate::mysql::protocol::Packet; @@ -26,8 +26,8 @@ use crate::mysql::{ impl MySqlConnection { async fn prepare(&mut self, query: &str) -> Result { - if let Some(&statement) = self.cache_statement.get(query) { - return Ok(statement); + if let Some(statement) = self.cache_statement.get_mut(query) { + return Ok(*statement); } // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html @@ -60,8 +60,10 @@ impl MySqlConnection { self.stream.maybe_recv_eof().await?; } - self.cache_statement - .insert(query.to_owned(), ok.statement_id); + // in case of the cache being full, close the least recently used statement + if let Some(statement) = self.cache_statement.insert(query, ok.statement_id) { + self.stream.send_packet(StmtClose { statement }).await?; + } Ok(ok.statement_id) } diff --git a/sqlx-core/src/mysql/connection/mod.rs b/sqlx-core/src/mysql/connection/mod.rs index a3d1d35f6d..0711eae495 100644 --- a/sqlx-core/src/mysql/connection/mod.rs +++ b/sqlx-core/src/mysql/connection/mod.rs @@ -6,10 +6,13 @@ use futures_core::future::BoxFuture; use futures_util::FutureExt; use hashbrown::HashMap; +use crate::caching_connection::CachingConnection; +use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; use crate::executor::Executor; use crate::ext::ustr::UStr; +use crate::mysql::protocol::statement::StmtClose; use crate::mysql::protocol::text::{Ping, Quit}; use crate::mysql::row::MySqlColumn; use crate::mysql::{MySql, MySqlConnectOptions}; @@ -34,7 +37,7 @@ pub struct MySqlConnection { pub(crate) stream: MySqlStream, // cache by query string to the statement id - cache_statement: HashMap, + cache_statement: StatementCache, // working memory for the active row's column information // this allows us to re-use these allocations unless the user is persisting the @@ -43,6 +46,22 @@ pub struct MySqlConnection { scratch_row_column_names: Arc>, } +impl CachingConnection for MySqlConnection { + fn cached_statements_count(&self) -> usize { + self.cache_statement.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + while let Some(statement) = self.cache_statement.remove_lru() { + self.stream.send_packet(StmtClose { statement }).await?; + } + + Ok(()) + }) + } +} + impl Debug for MySqlConnection { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("MySqlConnection").finish() diff --git a/sqlx-core/src/mysql/options.rs b/sqlx-core/src/mysql/options.rs index 3720f444bd..339a960544 100644 --- a/sqlx-core/src/mysql/options.rs +++ b/sqlx-core/src/mysql/options.rs @@ -101,6 +101,7 @@ pub struct MySqlConnectOptions { pub(crate) database: Option, pub(crate) ssl_mode: MySqlSslMode, pub(crate) ssl_ca: Option, + pub(crate) statement_cache_size: usize, } impl Default for MySqlConnectOptions { @@ -120,6 +121,7 @@ impl MySqlConnectOptions { database: None, ssl_mode: MySqlSslMode::Preferred, ssl_ca: None, + statement_cache_size: 100, } } @@ -190,6 +192,17 @@ impl MySqlConnectOptions { self.ssl_ca = Some(file_name.as_ref().to_owned()); self } + + /// Sets the size of the connection's statement cache in a number of stored + /// distinct statements. Caching is handled using LRU, meaning when the + /// amount of queries hits the defined limit, the oldest statement will get + /// dropped. + /// + /// The default cache size is 100 statements. + pub fn statement_cache_size(mut self, size: usize) -> Self { + self.statement_cache_size = size; + self + } } impl FromStr for MySqlConnectOptions { @@ -231,6 +244,10 @@ impl FromStr for MySqlConnectOptions { options = options.ssl_ca(&*value); } + "statement-cache-size" => { + options = options.statement_cache_size(value.parse()?); + } + _ => {} } } diff --git a/sqlx-core/src/mysql/protocol/statement/mod.rs b/sqlx-core/src/mysql/protocol/statement/mod.rs index 5ad292f560..9ae6b3c909 100644 --- a/sqlx-core/src/mysql/protocol/statement/mod.rs +++ b/sqlx-core/src/mysql/protocol/statement/mod.rs @@ -2,8 +2,10 @@ mod execute; mod prepare; mod prepare_ok; mod row; +mod stmt_close; pub(crate) use execute::Execute; pub(crate) use prepare::Prepare; pub(crate) use prepare_ok::PrepareOk; pub(crate) use row::BinaryRow; +pub(crate) use stmt_close::StmtClose; diff --git a/sqlx-core/src/mysql/protocol/statement/stmt_close.rs b/sqlx-core/src/mysql/protocol/statement/stmt_close.rs new file mode 100644 index 0000000000..13f095f9b5 --- /dev/null +++ b/sqlx-core/src/mysql/protocol/statement/stmt_close.rs @@ -0,0 +1,16 @@ +use crate::io::Encode; +use crate::mysql::protocol::Capabilities; + +// https://dev.mysql.com/doc/internals/en/com-stmt-close.html + +#[derive(Debug)] +pub struct StmtClose { + pub statement: u32, +} + +impl Encode<'_, Capabilities> for StmtClose { + fn encode_with(&self, buf: &mut Vec, _: Capabilities) { + buf.push(0x19); // COM_STMT_CLOSE + buf.extend(&self.statement.to_le_bytes()); + } +} diff --git a/src/lib.rs b/src/lib.rs index a11f3ce347..f05cc6dcf8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] pub use sqlx_core::arguments::{Arguments, IntoArguments}; +pub use sqlx_core::caching_connection::CachingConnection; pub use sqlx_core::connection::{Connect, Connection}; pub use sqlx_core::database::{self, Database}; pub use sqlx_core::executor::{Execute, Executor}; diff --git a/tests/mysql/mysql.rs b/tests/mysql/mysql.rs index dfdf0cf5cc..95a1815b3f 100644 --- a/tests/mysql/mysql.rs +++ b/tests/mysql/mysql.rs @@ -1,6 +1,6 @@ use futures::TryStreamExt; use sqlx::mysql::{MySql, MySqlPool, MySqlRow}; -use sqlx::{Connection, Executor, Row}; +use sqlx::{CachingConnection, Connection, Executor, Row}; use sqlx_test::new; #[sqlx_macros::test] @@ -177,3 +177,25 @@ SELECT id, text FROM messages; Ok(()) } + +#[sqlx_macros::test] +async fn it_caches_statements() -> anyhow::Result<()> { + let mut conn = new::().await?; + + for i in 0..2 { + let row = sqlx::query("SELECT ? AS val") + .bind(i) + .fetch_one(&mut conn) + .await?; + + let val: u32 = row.get("val"); + + assert_eq!(i, val); + } + + assert_eq!(1, conn.cached_statements_count()); + conn.clear_cached_statements().await?; + assert_eq!(0, conn.cached_statements_count()); + + Ok(()) +} From 5d64310004ace401ec966d2bb9c94c7b6e0cb392 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Wed, 24 Jun 2020 18:59:39 +0200 Subject: [PATCH 2/7] LRU caching for PostgreSQL --- sqlx-core/src/common/statement_cache.rs | 5 ++++ .../src/postgres/connection/establish.rs | 3 ++- sqlx-core/src/postgres/connection/executor.rs | 5 ++-- sqlx-core/src/postgres/connection/mod.rs | 17 +++++++++++++- sqlx-core/src/postgres/options.rs | 17 ++++++++++++++ tests/postgres/postgres.rs | 23 +++++++++++++++++++ 6 files changed, 66 insertions(+), 4 deletions(-) diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs index e87785eb09..7ba19a610b 100644 --- a/sqlx-core/src/common/statement_cache.rs +++ b/sqlx-core/src/common/statement_cache.rs @@ -47,4 +47,9 @@ impl StatementCache { pub fn remove_lru(&mut self) -> Option { self.inner.remove_lru().map(|(_, v)| v) } + + /// Clear all cached statements from the cache. + pub fn clear(&mut self) { + self.inner.clear(); + } } diff --git a/sqlx-core/src/postgres/connection/establish.rs b/sqlx-core/src/postgres/connection/establish.rs index 7b160ad16d..a0438e6f3c 100644 --- a/sqlx-core/src/postgres/connection/establish.rs +++ b/sqlx-core/src/postgres/connection/establish.rs @@ -1,5 +1,6 @@ use hashbrown::HashMap; +use crate::common::StatementCache; use crate::error::Error; use crate::io::Decode; use crate::postgres::connection::{sasl, stream::PgStream, tls}; @@ -138,7 +139,7 @@ impl PgConnection { transaction_status, pending_ready_for_query_count: 0, next_statement_id: 1, - cache_statement: HashMap::with_capacity(10), + cache_statement: StatementCache::new(options.statement_cache_size), cache_type_oid: HashMap::new(), cache_type_info: HashMap::new(), scratch_row_columns: Default::default(), diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index ed58ed56fa..b3fc706767 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -88,15 +88,16 @@ async fn recv_desc_rows(conn: &mut PgConnection) -> Result Result { - if let Some(statement) = self.cache_statement.get(query) { + if let Some(statement) = self.cache_statement.get_mut(query) { return Ok(*statement); } let statement = prepare(self, query, arguments).await?; - self.cache_statement.insert(query.to_owned(), statement); + self.cache_statement.insert(query, statement); Ok(statement) } diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index e3d1e22066..987769dfdc 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -5,6 +5,8 @@ use futures_core::future::BoxFuture; use futures_util::{FutureExt, TryFutureExt}; use hashbrown::HashMap; +use crate::caching_connection::CachingConnection; +use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; use crate::executor::Executor; @@ -46,7 +48,7 @@ pub struct PgConnection { next_statement_id: u32, // cache statement by query string to the id and columns - cache_statement: HashMap, + cache_statement: StatementCache, // cache user-defined types by id <-> info cache_type_info: HashMap, @@ -96,6 +98,19 @@ impl Debug for PgConnection { } } +impl CachingConnection for PgConnection { + fn cached_statements_count(&self) -> usize { + self.cache_statement.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + self.cache_statement.clear(); + Ok(()) + }) + } +} + impl Connection for PgConnection { type Database = Postgres; diff --git a/sqlx-core/src/postgres/options.rs b/sqlx-core/src/postgres/options.rs index b59a332f53..8edb1414f4 100644 --- a/sqlx-core/src/postgres/options.rs +++ b/sqlx-core/src/postgres/options.rs @@ -115,6 +115,7 @@ pub struct PgConnectOptions { pub(crate) database: Option, pub(crate) ssl_mode: PgSslMode, pub(crate) ssl_root_cert: Option, + pub(crate) statement_cache_size: usize, } impl Default for PgConnectOptions { @@ -162,6 +163,7 @@ impl PgConnectOptions { .ok() .and_then(|v| v.parse().ok()) .unwrap_or_default(), + statement_cache_size: 100, } } @@ -285,6 +287,17 @@ impl PgConnectOptions { self.ssl_root_cert = Some(cert.as_ref().to_path_buf()); self } + + /// Sets the size of the connection's statement cache in a number of stored + /// distinct statements. Caching is handled using LRU, meaning when the + /// amount of queries hits the defined limit, the oldest statement will get + /// dropped. + /// + /// The default cache size is 100 statements. + pub fn statement_cache_size(mut self, size: usize) -> Self { + self.statement_cache_size = size; + self + } } fn default_host(port: u16) -> String { @@ -345,6 +358,10 @@ impl FromStr for PgConnectOptions { options = options.ssl_root_cert(&*value); } + "statement-cache-size" => { + options = options.statement_cache_size(value.parse()?); + } + _ => {} } } diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index 2aeea43dbc..b7a40cf45c 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -1,6 +1,7 @@ use futures::TryStreamExt; use sqlx::postgres::PgRow; use sqlx::postgres::{PgDatabaseError, PgErrorPosition, PgSeverity}; +use sqlx::CachingConnection; use sqlx::{postgres::Postgres, Connection, Executor, PgPool, Row}; use sqlx_test::new; use std::time::Duration; @@ -487,3 +488,25 @@ SELECT id, text FROM _sqlx_test_postgres_5112; Ok(()) } + +#[sqlx_macros::test] +async fn it_caches_statements() -> anyhow::Result<()> { + let mut conn = new::().await?; + + for i in 0..2 { + let row = sqlx::query("SELECT $1 AS val") + .bind(i) + .fetch_one(&mut conn) + .await?; + + let val: u32 = row.get("val"); + + assert_eq!(i, val); + } + + assert_eq!(1, conn.cached_statements_count()); + conn.clear_cached_statements().await?; + assert_eq!(0, conn.cached_statements_count()); + + Ok(()) +} From eba82e3fc12cf503c25aa2bbe0fb3ae51dd4cdc2 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Wed, 24 Jun 2020 19:43:51 +0200 Subject: [PATCH 3/7] LRU caching for SQLite --- sqlx-core/src/common/statement_cache.rs | 19 ++++++++----- sqlx-core/src/mysql/connection/mod.rs | 2 +- sqlx-core/src/postgres/connection/mod.rs | 2 +- sqlx-core/src/sqlite/connection/establish.rs | 8 ++++-- sqlx-core/src/sqlite/connection/executor.rs | 8 ++++-- sqlx-core/src/sqlite/connection/mod.rs | 17 +++++++++++- sqlx-core/src/sqlite/options.rs | 29 ++++++++++++++++---- tests/sqlite/sqlite.rs | 25 ++++++++++++++++- 8 files changed, 89 insertions(+), 21 deletions(-) diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs index 7ba19a610b..2740bde771 100644 --- a/sqlx-core/src/common/statement_cache.rs +++ b/sqlx-core/src/common/statement_cache.rs @@ -3,11 +3,11 @@ use lru_cache::LruCache; /// A cache for prepared statements. When full, the least recently used /// statement gets removed. #[derive(Debug)] -pub struct StatementCache { - inner: LruCache, +pub struct StatementCache { + inner: LruCache, } -impl StatementCache { +impl StatementCache { /// Create a new cache with the given capacity. pub fn new(capacity: usize) -> Self { Self { @@ -17,19 +17,19 @@ impl StatementCache { /// Returns a mutable reference to the value corresponding to the given key /// in the cache, if any. - pub fn get_mut(&mut self, k: &str) -> Option<&mut u32> { + pub fn get_mut(&mut self, k: &str) -> Option<&mut T> { self.inner.get_mut(k) } /// Inserts a new statement to the cache, returning the least recently used /// statement id if the cache is full, or if inserting with an existing key, /// the replaced existing statement. - pub fn insert(&mut self, k: &str, v: u32) -> Option { + pub fn insert(&mut self, k: &str, v: T) -> Option { let mut lru_item = None; if self.inner.capacity() == self.len() && !self.inner.contains_key(k) { lru_item = self.remove_lru(); - } else if self.inner.contains_key(k) { + } else if self.contains_key(k) { lru_item = self.inner.remove(k); } @@ -44,7 +44,7 @@ impl StatementCache { } /// Removes the least recently used item from the cache. - pub fn remove_lru(&mut self) -> Option { + pub fn remove_lru(&mut self) -> Option { self.inner.remove_lru().map(|(_, v)| v) } @@ -52,4 +52,9 @@ impl StatementCache { pub fn clear(&mut self) { self.inner.clear(); } + + /// True if cache has a value for the given key. + pub fn contains_key(&mut self, k: &str) -> bool { + self.inner.contains_key(k) + } } diff --git a/sqlx-core/src/mysql/connection/mod.rs b/sqlx-core/src/mysql/connection/mod.rs index 0711eae495..028c1a9008 100644 --- a/sqlx-core/src/mysql/connection/mod.rs +++ b/sqlx-core/src/mysql/connection/mod.rs @@ -37,7 +37,7 @@ pub struct MySqlConnection { pub(crate) stream: MySqlStream, // cache by query string to the statement id - cache_statement: StatementCache, + cache_statement: StatementCache, // working memory for the active row's column information // this allows us to re-use these allocations unless the user is persisting the diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index 987769dfdc..ef1e437bc9 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -48,7 +48,7 @@ pub struct PgConnection { next_statement_id: u32, // cache statement by query string to the id and columns - cache_statement: StatementCache, + cache_statement: StatementCache, // cache user-defined types by id <-> info cache_type_info: HashMap, diff --git a/sqlx-core/src/sqlite/connection/establish.rs b/sqlx-core/src/sqlite/connection/establish.rs index dbebd1652b..fe2b9d8aa7 100644 --- a/sqlx-core/src/sqlite/connection/establish.rs +++ b/sqlx-core/src/sqlite/connection/establish.rs @@ -1,7 +1,6 @@ use std::io; use std::ptr::{null, null_mut}; -use hashbrown::HashMap; use libsqlite3_sys::{ sqlite3_busy_timeout, sqlite3_extended_result_codes, sqlite3_open_v2, SQLITE_OK, SQLITE_OPEN_CREATE, SQLITE_OPEN_MEMORY, SQLITE_OPEN_NOMUTEX, SQLITE_OPEN_PRIVATECACHE, @@ -12,7 +11,10 @@ use sqlx_rt::blocking; use crate::error::Error; use crate::sqlite::connection::handle::ConnectionHandle; use crate::sqlite::statement::StatementWorker; -use crate::sqlite::{SqliteConnectOptions, SqliteConnection, SqliteError}; +use crate::{ + common::StatementCache, + sqlite::{SqliteConnectOptions, SqliteConnection, SqliteError}, +}; pub(super) async fn establish(options: &SqliteConnectOptions) -> Result { let mut filename = options @@ -87,7 +89,7 @@ pub(super) async fn establish(options: &SqliteConnectOptions) -> Result( conn: &mut ConnectionHandle, - statements: &'a mut HashMap, + statements: &'a mut StatementCache, statement: &'a mut Option, query: &str, persistent: bool, @@ -28,7 +29,10 @@ fn prepare<'a>( if !statements.contains_key(query) { let statement = SqliteStatement::prepare(conn, query, false)?; - statements.insert(query.to_owned(), statement); + + if let Some(mut statement) = statements.insert(query, statement) { + statement.reset(); + } } let statement = statements.get_mut(query).unwrap(); diff --git a/sqlx-core/src/sqlite/connection/mod.rs b/sqlx-core/src/sqlite/connection/mod.rs index d70c1c8909..69042d8303 100644 --- a/sqlx-core/src/sqlite/connection/mod.rs +++ b/sqlx-core/src/sqlite/connection/mod.rs @@ -6,6 +6,8 @@ use futures_util::future; use hashbrown::HashMap; use libsqlite3_sys::sqlite3; +use crate::caching_connection::CachingConnection; +use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; use crate::ext::ustr::UStr; @@ -25,7 +27,7 @@ pub struct SqliteConnection { pub(crate) worker: StatementWorker, // cache of semi-persistent statements - pub(crate) statements: HashMap, + pub(crate) statements: StatementCache, // most recent non-persistent statement pub(crate) statement: Option, @@ -47,6 +49,19 @@ impl Debug for SqliteConnection { } } +impl CachingConnection for SqliteConnection { + fn cached_statements_count(&self) -> usize { + self.statements.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + self.statements.clear(); + Ok(()) + }) + } +} + impl Connection for SqliteConnection { type Database = Sqlite; diff --git a/sqlx-core/src/sqlite/options.rs b/sqlx-core/src/sqlite/options.rs index 5f925b6b39..6408dd68b6 100644 --- a/sqlx-core/src/sqlite/options.rs +++ b/sqlx-core/src/sqlite/options.rs @@ -1,5 +1,5 @@ use std::path::PathBuf; -use std::str::FromStr; +use std::{io, str::FromStr}; use crate::error::BoxDynError; @@ -10,6 +10,7 @@ use crate::error::BoxDynError; pub struct SqliteConnectOptions { pub(crate) filename: PathBuf, pub(crate) in_memory: bool, + pub(crate) statement_cache_size: usize, } impl Default for SqliteConnectOptions { @@ -23,6 +24,7 @@ impl SqliteConnectOptions { Self { filename: PathBuf::from(":memory:"), in_memory: false, + statement_cache_size: 100, } } } @@ -34,6 +36,7 @@ impl FromStr for SqliteConnectOptions { let mut options = Self { filename: PathBuf::new(), in_memory: false, + statement_cache_size: 100, }; // remove scheme @@ -41,10 +44,26 @@ impl FromStr for SqliteConnectOptions { .trim_start_matches("sqlite://") .trim_start_matches("sqlite:"); - if s == ":memory:" { - options.in_memory = true; - } else { - options.filename = s.parse()?; + let mut splitted = s.split("?"); + + match splitted.next() { + Some(":memory:") => options.in_memory = true, + Some(s) => options.filename = s.parse()?, + None => unreachable!(), + } + + match splitted.next().map(|s| s.split("=")) { + Some(mut splitted) => { + if splitted.next() == Some("statement-cache-size") { + options.statement_cache_size = splitted + .next() + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Invalid connection string") + })? + .parse()? + } + } + _ => (), } Ok(options) diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index 1630773b0f..6935f04ff7 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -1,6 +1,7 @@ use futures::TryStreamExt; use sqlx::{ - query, sqlite::Sqlite, Connect, Connection, Executor, Row, SqliteConnection, SqlitePool, + query, sqlite::Sqlite, CachingConnection, Connect, Connection, Executor, Row, SqliteConnection, + SqlitePool, }; use sqlx_test::new; @@ -269,3 +270,25 @@ SELECT id, text FROM _sqlx_test; Ok(()) } + +#[sqlx_macros::test] +async fn it_caches_statements() -> anyhow::Result<()> { + let mut conn = new::().await?; + + for i in 0..2 { + let row = sqlx::query("SELECT ? AS val") + .bind(i) + .fetch_one(&mut conn) + .await?; + + let val: i32 = row.get("val"); + + assert_eq!(i, val); + } + + assert_eq!(1, conn.cached_statements_count()); + conn.clear_cached_statements().await?; + assert_eq!(0, conn.cached_statements_count()); + + Ok(()) +} From 2c2a27766686df2c8f99c1bcaaecff33c5821236 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Thu, 25 Jun 2020 10:37:18 +0200 Subject: [PATCH 4/7] Caching methods in Connection --- sqlx-core/src/caching_connection.rs | 13 ---------- sqlx-core/src/common/statement_cache.rs | 1 + sqlx-core/src/connection.rs | 19 ++++++++++++++- sqlx-core/src/database.rs | 2 ++ sqlx-core/src/lib.rs | 1 - sqlx-core/src/mysql/connection/mod.rs | 31 +++++++++++------------- sqlx-core/src/mysql/database.rs | 4 ++- sqlx-core/src/postgres/connection/mod.rs | 25 +++++++++---------- sqlx-core/src/postgres/database.rs | 4 ++- sqlx-core/src/sqlite/connection/mod.rs | 25 +++++++++---------- sqlx-core/src/sqlite/database.rs | 4 ++- src/lib.rs | 1 - tests/mysql/mysql.rs | 6 ++--- tests/postgres/postgres.rs | 5 ++-- tests/sqlite/sqlite.rs | 7 +++--- 15 files changed, 74 insertions(+), 74 deletions(-) delete mode 100644 sqlx-core/src/caching_connection.rs diff --git a/sqlx-core/src/caching_connection.rs b/sqlx-core/src/caching_connection.rs deleted file mode 100644 index 88482514b5..0000000000 --- a/sqlx-core/src/caching_connection.rs +++ /dev/null @@ -1,13 +0,0 @@ -use futures_core::future::BoxFuture; - -use crate::error::Error; - -/// A connection that is capable of caching prepared statements. -pub trait CachingConnection: Send { - /// The number of statements currently cached in the connection. - fn cached_statements_count(&self) -> usize; - - /// Removes all statements from the cache, closing them on the server if - /// needed. - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>; -} diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs index 2740bde771..6ad7c5b7f8 100644 --- a/sqlx-core/src/common/statement_cache.rs +++ b/sqlx-core/src/common/statement_cache.rs @@ -49,6 +49,7 @@ impl StatementCache { } /// Clear all cached statements from the cache. + #[cfg(any(feature = "postgres", feature = "sqlite"))] pub fn clear(&mut self) { self.inner.clear(); } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index e6eed54fbf..ae1a0a3b4a 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use futures_core::future::BoxFuture; use futures_core::Future; -use crate::database::Database; +use crate::database::{Database, HasStatementCache}; use crate::error::{BoxDynError, Error}; use crate::transaction::Transaction; @@ -64,6 +64,23 @@ pub trait Connection: Send { }) } + /// The number of statements currently cached in the connection. + fn cached_statements_size(&self) -> usize + where + Self::Database: HasStatementCache, + { + 0 + } + + /// Removes all statements from the cache, closing them on the server if + /// needed. + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> + where + Self::Database: HasStatementCache, + { + Box::pin(async move { Ok(()) }) + } + #[doc(hidden)] fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>; diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index fc400d4b9d..388c45baac 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -74,3 +74,5 @@ pub trait HasArguments<'q> { /// The concrete type used as a buffer for arguments while encoding. type ArgumentBuffer: Default; } + +pub trait HasStatementCache {} diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 3785c2bde2..118b42c8d9 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -29,7 +29,6 @@ pub mod arguments; #[macro_use] pub mod pool; -pub mod caching_connection; pub mod connection; #[macro_use] diff --git a/sqlx-core/src/mysql/connection/mod.rs b/sqlx-core/src/mysql/connection/mod.rs index 028c1a9008..0e8ef38731 100644 --- a/sqlx-core/src/mysql/connection/mod.rs +++ b/sqlx-core/src/mysql/connection/mod.rs @@ -6,7 +6,6 @@ use futures_core::future::BoxFuture; use futures_util::FutureExt; use hashbrown::HashMap; -use crate::caching_connection::CachingConnection; use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; @@ -46,22 +45,6 @@ pub struct MySqlConnection { scratch_row_column_names: Arc>, } -impl CachingConnection for MySqlConnection { - fn cached_statements_count(&self) -> usize { - self.cache_statement.len() - } - - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - while let Some(statement) = self.cache_statement.remove_lru() { - self.stream.send_packet(StmtClose { statement }).await?; - } - - Ok(()) - }) - } -} - impl Debug for MySqlConnection { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("MySqlConnection").finish() @@ -94,6 +77,20 @@ impl Connection for MySqlConnection { self.stream.wait_until_ready().boxed() } + fn cached_statements_size(&self) -> usize { + self.cache_statement.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + while let Some(statement) = self.cache_statement.remove_lru() { + self.stream.send_packet(StmtClose { statement }).await?; + } + + Ok(()) + }) + } + #[doc(hidden)] fn should_flush(&self) -> bool { !self.stream.wbuf.is_empty() diff --git a/sqlx-core/src/mysql/database.rs b/sqlx-core/src/mysql/database.rs index ef97686578..6bb8083202 100644 --- a/sqlx-core/src/mysql/database.rs +++ b/sqlx-core/src/mysql/database.rs @@ -1,4 +1,4 @@ -use crate::database::{Database, HasArguments, HasValueRef}; +use crate::database::{Database, HasArguments, HasStatementCache, HasValueRef}; use crate::mysql::value::{MySqlValue, MySqlValueRef}; use crate::mysql::{ MySqlArguments, MySqlConnection, MySqlRow, MySqlTransactionManager, MySqlTypeInfo, @@ -33,3 +33,5 @@ impl HasArguments<'_> for MySql { type ArgumentBuffer = Vec; } + +impl HasStatementCache for MySql {} diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index ef1e437bc9..1821e279c0 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -5,7 +5,6 @@ use futures_core::future::BoxFuture; use futures_util::{FutureExt, TryFutureExt}; use hashbrown::HashMap; -use crate::caching_connection::CachingConnection; use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; @@ -98,19 +97,6 @@ impl Debug for PgConnection { } } -impl CachingConnection for PgConnection { - fn cached_statements_count(&self) -> usize { - self.cache_statement.len() - } - - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - self.cache_statement.clear(); - Ok(()) - }) - } -} - impl Connection for PgConnection { type Database = Postgres; @@ -134,6 +120,17 @@ impl Connection for PgConnection { self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() } + fn cached_statements_size(&self) -> usize { + self.cache_statement.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + self.cache_statement.clear(); + Ok(()) + }) + } + #[doc(hidden)] fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { self.wait_until_ready().boxed() diff --git a/sqlx-core/src/postgres/database.rs b/sqlx-core/src/postgres/database.rs index 8b1614120e..8b3af756e6 100644 --- a/sqlx-core/src/postgres/database.rs +++ b/sqlx-core/src/postgres/database.rs @@ -1,4 +1,4 @@ -use crate::database::{Database, HasArguments, HasValueRef}; +use crate::database::{Database, HasArguments, HasStatementCache, HasValueRef}; use crate::postgres::arguments::PgArgumentBuffer; use crate::postgres::value::{PgValue, PgValueRef}; use crate::postgres::{PgArguments, PgConnection, PgRow, PgTransactionManager, PgTypeInfo}; @@ -32,3 +32,5 @@ impl HasArguments<'_> for Postgres { type ArgumentBuffer = PgArgumentBuffer; } + +impl HasStatementCache for Postgres {} diff --git a/sqlx-core/src/sqlite/connection/mod.rs b/sqlx-core/src/sqlite/connection/mod.rs index 69042d8303..d768f14332 100644 --- a/sqlx-core/src/sqlite/connection/mod.rs +++ b/sqlx-core/src/sqlite/connection/mod.rs @@ -6,7 +6,6 @@ use futures_util::future; use hashbrown::HashMap; use libsqlite3_sys::sqlite3; -use crate::caching_connection::CachingConnection; use crate::common::StatementCache; use crate::connection::{Connect, Connection}; use crate::error::Error; @@ -49,19 +48,6 @@ impl Debug for SqliteConnection { } } -impl CachingConnection for SqliteConnection { - fn cached_statements_count(&self) -> usize { - self.statements.len() - } - - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - self.statements.clear(); - Ok(()) - }) - } -} - impl Connection for SqliteConnection { type Database = Sqlite; @@ -75,6 +61,17 @@ impl Connection for SqliteConnection { Box::pin(future::ok(())) } + fn cached_statements_size(&self) -> usize { + self.statements.len() + } + + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + self.statements.clear(); + Ok(()) + }) + } + #[doc(hidden)] fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { // For SQLite, FLUSH does effectively nothing diff --git a/sqlx-core/src/sqlite/database.rs b/sqlx-core/src/sqlite/database.rs index 3d8ea27520..fa00660fbe 100644 --- a/sqlx-core/src/sqlite/database.rs +++ b/sqlx-core/src/sqlite/database.rs @@ -1,4 +1,4 @@ -use crate::database::{Database, HasArguments, HasValueRef}; +use crate::database::{Database, HasArguments, HasStatementCache, HasValueRef}; use crate::sqlite::{ SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteRow, SqliteTransactionManager, SqliteTypeInfo, SqliteValue, SqliteValueRef, @@ -33,3 +33,5 @@ impl<'q> HasArguments<'q> for Sqlite { type ArgumentBuffer = Vec>; } + +impl HasStatementCache for Sqlite {} diff --git a/src/lib.rs b/src/lib.rs index f05cc6dcf8..a11f3ce347 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] pub use sqlx_core::arguments::{Arguments, IntoArguments}; -pub use sqlx_core::caching_connection::CachingConnection; pub use sqlx_core::connection::{Connect, Connection}; pub use sqlx_core::database::{self, Database}; pub use sqlx_core::executor::{Execute, Executor}; diff --git a/tests/mysql/mysql.rs b/tests/mysql/mysql.rs index 95a1815b3f..becede928d 100644 --- a/tests/mysql/mysql.rs +++ b/tests/mysql/mysql.rs @@ -1,6 +1,6 @@ use futures::TryStreamExt; use sqlx::mysql::{MySql, MySqlPool, MySqlRow}; -use sqlx::{CachingConnection, Connection, Executor, Row}; +use sqlx::{Connection, Executor, Row}; use sqlx_test::new; #[sqlx_macros::test] @@ -193,9 +193,9 @@ async fn it_caches_statements() -> anyhow::Result<()> { assert_eq!(i, val); } - assert_eq!(1, conn.cached_statements_count()); + assert_eq!(1, conn.cached_statements_size()); conn.clear_cached_statements().await?; - assert_eq!(0, conn.cached_statements_count()); + assert_eq!(0, conn.cached_statements_size()); Ok(()) } diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index b7a40cf45c..e582b671bf 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -1,7 +1,6 @@ use futures::TryStreamExt; use sqlx::postgres::PgRow; use sqlx::postgres::{PgDatabaseError, PgErrorPosition, PgSeverity}; -use sqlx::CachingConnection; use sqlx::{postgres::Postgres, Connection, Executor, PgPool, Row}; use sqlx_test::new; use std::time::Duration; @@ -504,9 +503,9 @@ async fn it_caches_statements() -> anyhow::Result<()> { assert_eq!(i, val); } - assert_eq!(1, conn.cached_statements_count()); + assert_eq!(1, conn.cached_statements_size()); conn.clear_cached_statements().await?; - assert_eq!(0, conn.cached_statements_count()); + assert_eq!(0, conn.cached_statements_size()); Ok(()) } diff --git a/tests/sqlite/sqlite.rs b/tests/sqlite/sqlite.rs index 6935f04ff7..35db78dedc 100644 --- a/tests/sqlite/sqlite.rs +++ b/tests/sqlite/sqlite.rs @@ -1,7 +1,6 @@ use futures::TryStreamExt; use sqlx::{ - query, sqlite::Sqlite, CachingConnection, Connect, Connection, Executor, Row, SqliteConnection, - SqlitePool, + query, sqlite::Sqlite, Connect, Connection, Executor, Row, SqliteConnection, SqlitePool, }; use sqlx_test::new; @@ -286,9 +285,9 @@ async fn it_caches_statements() -> anyhow::Result<()> { assert_eq!(i, val); } - assert_eq!(1, conn.cached_statements_count()); + assert_eq!(1, conn.cached_statements_size()); conn.clear_cached_statements().await?; - assert_eq!(0, conn.cached_statements_count()); + assert_eq!(0, conn.cached_statements_size()); Ok(()) } From f969798cb6a066004294db1b48ef81057ca05bbc Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Thu, 25 Jun 2020 11:57:55 +0200 Subject: [PATCH 5/7] Document new connection string params --- sqlx-core/src/mysql/connection/establish.rs | 2 +- sqlx-core/src/mysql/options.rs | 26 +++++++---- .../src/postgres/connection/establish.rs | 2 +- sqlx-core/src/postgres/options.rs | 27 ++++++++---- sqlx-core/src/sqlite/connection/establish.rs | 2 +- sqlx-core/src/sqlite/options.rs | 43 ++++++++----------- 6 files changed, 59 insertions(+), 43 deletions(-) diff --git a/sqlx-core/src/mysql/connection/establish.rs b/sqlx-core/src/mysql/connection/establish.rs index 3174589444..bda5a39ac9 100644 --- a/sqlx-core/src/mysql/connection/establish.rs +++ b/sqlx-core/src/mysql/connection/establish.rs @@ -98,7 +98,7 @@ impl MySqlConnection { Ok(Self { stream, - cache_statement: StatementCache::new(options.statement_cache_size), + cache_statement: StatementCache::new(options.statement_cache_capacity), scratch_row_columns: Default::default(), scratch_row_column_names: Default::default(), }) diff --git a/sqlx-core/src/mysql/options.rs b/sqlx-core/src/mysql/options.rs index 339a960544..35eaad33a4 100644 --- a/sqlx-core/src/mysql/options.rs +++ b/sqlx-core/src/mysql/options.rs @@ -68,6 +68,14 @@ impl FromStr for MySqlSslMode { /// mysql://[host][/database][?properties] /// ``` /// +/// ## Properties +/// +/// |Parameter|Default|Description| +/// |---------|-------|-----------| +/// | `ssl-mode` | `PREFERRED` | Determines whether or with what priority a secure SSL TCP/IP connection will be negotiated. See [`MySqlSslMode`]. | +/// | `ssl-ca` | `None` | Sets the name of a file containing a list of trusted SSL Certificate Authorities. | +/// | `statement-cache-capacity` | `100` | The maximum number of prepared statements stored in the cache. Set to `0` to disable. | +/// /// # Example /// /// ```rust,no_run @@ -92,6 +100,8 @@ impl FromStr for MySqlSslMode { /// # }) /// # } /// ``` +/// +/// [`MySqlSslMode`]: enum.MySqlSslMode.html #[derive(Debug, Clone)] pub struct MySqlConnectOptions { pub(crate) host: String, @@ -101,7 +111,7 @@ pub struct MySqlConnectOptions { pub(crate) database: Option, pub(crate) ssl_mode: MySqlSslMode, pub(crate) ssl_ca: Option, - pub(crate) statement_cache_size: usize, + pub(crate) statement_cache_capacity: usize, } impl Default for MySqlConnectOptions { @@ -121,7 +131,7 @@ impl MySqlConnectOptions { database: None, ssl_mode: MySqlSslMode::Preferred, ssl_ca: None, - statement_cache_size: 100, + statement_cache_capacity: 100, } } @@ -193,14 +203,14 @@ impl MySqlConnectOptions { self } - /// Sets the size of the connection's statement cache in a number of stored + /// Sets the capacity of the connection's statement cache in a number of stored /// distinct statements. Caching is handled using LRU, meaning when the /// amount of queries hits the defined limit, the oldest statement will get /// dropped. /// - /// The default cache size is 100 statements. - pub fn statement_cache_size(mut self, size: usize) -> Self { - self.statement_cache_size = size; + /// The default cache capacity is 100 statements. + pub fn statement_cache_capacity(mut self, capacity: usize) -> Self { + self.statement_cache_capacity = capacity; self } } @@ -244,8 +254,8 @@ impl FromStr for MySqlConnectOptions { options = options.ssl_ca(&*value); } - "statement-cache-size" => { - options = options.statement_cache_size(value.parse()?); + "statement-cache-capacity" => { + options = options.statement_cache_capacity(value.parse()?); } _ => {} diff --git a/sqlx-core/src/postgres/connection/establish.rs b/sqlx-core/src/postgres/connection/establish.rs index a0438e6f3c..84219b410c 100644 --- a/sqlx-core/src/postgres/connection/establish.rs +++ b/sqlx-core/src/postgres/connection/establish.rs @@ -139,7 +139,7 @@ impl PgConnection { transaction_status, pending_ready_for_query_count: 0, next_statement_id: 1, - cache_statement: StatementCache::new(options.statement_cache_size), + cache_statement: StatementCache::new(options.statement_cache_capacity), cache_type_oid: HashMap::new(), cache_type_info: HashMap::new(), scratch_row_columns: Default::default(), diff --git a/sqlx-core/src/postgres/options.rs b/sqlx-core/src/postgres/options.rs index 8edb1414f4..d1640fa953 100644 --- a/sqlx-core/src/postgres/options.rs +++ b/sqlx-core/src/postgres/options.rs @@ -69,6 +69,15 @@ impl FromStr for PgSslMode { /// postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...] /// ``` /// +/// ## Parameters +/// +/// |Parameter|Default|Description| +/// |---------|-------|-----------| +/// | `sslmode` | `prefer` | Determines whether or with what priority a secure SSL TCP/IP connection will be negotiated. See [`PgSqlSslMode`]. | +/// | `sslrootcert` | `None` | Sets the name of a file containing a list of trusted SSL Certificate Authorities. | +/// | `statement-cache-capacity` | `100` | The maximum number of prepared statements stored in the cache. Set to `0` to disable. | +/// +/// /// The URI scheme designator can be either `postgresql://` or `postgres://`. /// Each of the URI parts is optional. /// @@ -106,6 +115,8 @@ impl FromStr for PgSslMode { /// # }) /// # } /// ``` +/// +/// [`PgSqlSslMode`]: enum.PgSslMode.html #[derive(Debug, Clone)] pub struct PgConnectOptions { pub(crate) host: String, @@ -115,7 +126,7 @@ pub struct PgConnectOptions { pub(crate) database: Option, pub(crate) ssl_mode: PgSslMode, pub(crate) ssl_root_cert: Option, - pub(crate) statement_cache_size: usize, + pub(crate) statement_cache_capacity: usize, } impl Default for PgConnectOptions { @@ -163,7 +174,7 @@ impl PgConnectOptions { .ok() .and_then(|v| v.parse().ok()) .unwrap_or_default(), - statement_cache_size: 100, + statement_cache_capacity: 100, } } @@ -288,14 +299,14 @@ impl PgConnectOptions { self } - /// Sets the size of the connection's statement cache in a number of stored + /// Sets the capacity of the connection's statement cache in a number of stored /// distinct statements. Caching is handled using LRU, meaning when the /// amount of queries hits the defined limit, the oldest statement will get /// dropped. /// - /// The default cache size is 100 statements. - pub fn statement_cache_size(mut self, size: usize) -> Self { - self.statement_cache_size = size; + /// The default cache capacity is 100 statements. + pub fn statement_cache_capacity(mut self, capacity: usize) -> Self { + self.statement_cache_capacity = capacity; self } } @@ -358,8 +369,8 @@ impl FromStr for PgConnectOptions { options = options.ssl_root_cert(&*value); } - "statement-cache-size" => { - options = options.statement_cache_size(value.parse()?); + "statement-cache-capacity" => { + options = options.statement_cache_capacity(value.parse()?); } _ => {} diff --git a/sqlx-core/src/sqlite/connection/establish.rs b/sqlx-core/src/sqlite/connection/establish.rs index fe2b9d8aa7..4c640bfb62 100644 --- a/sqlx-core/src/sqlite/connection/establish.rs +++ b/sqlx-core/src/sqlite/connection/establish.rs @@ -89,7 +89,7 @@ pub(super) async fn establish(options: &SqliteConnectOptions) -> Result Self { + self.statement_cache_capacity = capacity; + self + } } impl FromStr for SqliteConnectOptions { @@ -36,7 +47,7 @@ impl FromStr for SqliteConnectOptions { let mut options = Self { filename: PathBuf::new(), in_memory: false, - statement_cache_size: 100, + statement_cache_capacity: 100, }; // remove scheme @@ -44,26 +55,10 @@ impl FromStr for SqliteConnectOptions { .trim_start_matches("sqlite://") .trim_start_matches("sqlite:"); - let mut splitted = s.split("?"); - - match splitted.next() { - Some(":memory:") => options.in_memory = true, - Some(s) => options.filename = s.parse()?, - None => unreachable!(), - } - - match splitted.next().map(|s| s.split("=")) { - Some(mut splitted) => { - if splitted.next() == Some("statement-cache-size") { - options.statement_cache_size = splitted - .next() - .ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "Invalid connection string") - })? - .parse()? - } - } - _ => (), + if s == ":memory:" { + options.in_memory = true; + } else { + options.filename = s.parse()?; } Ok(options) From 745a32ab600f31c95f1d0d9a66634676c049a847 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Thu, 25 Jun 2020 12:26:44 +0200 Subject: [PATCH 6/7] Close pg statements correctly --- sqlx-core/src/common/statement_cache.rs | 2 +- sqlx-core/src/postgres/connection/executor.rs | 10 ++++-- sqlx-core/src/postgres/connection/mod.rs | 15 +++++++-- sqlx-core/src/postgres/message/close.rs | 32 +++++++++++++++++++ sqlx-core/src/postgres/message/mod.rs | 2 ++ 5 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 sqlx-core/src/postgres/message/close.rs diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs index 6ad7c5b7f8..f0f108cf5f 100644 --- a/sqlx-core/src/common/statement_cache.rs +++ b/sqlx-core/src/common/statement_cache.rs @@ -49,7 +49,7 @@ impl StatementCache { } /// Clear all cached statements from the cache. - #[cfg(any(feature = "postgres", feature = "sqlite"))] + #[cfg(any(feature = "sqlite"))] pub fn clear(&mut self) { self.inner.clear(); } diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index b3fc706767..e18a9c7217 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -9,8 +9,8 @@ use crate::describe::Describe; use crate::error::Error; use crate::executor::{Execute, Executor}; use crate::postgres::message::{ - self, Bind, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse, Query, - RowDescription, + self, Bind, Close, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse, + Query, RowDescription, }; use crate::postgres::type_info::PgType; use crate::postgres::{PgArguments, PgConnection, PgRow, PgValueFormat, Postgres}; @@ -97,7 +97,11 @@ impl PgConnection { let statement = prepare(self, query, arguments).await?; - self.cache_statement.insert(query, statement); + if let Some(statement) = self.cache_statement.insert(query, statement) { + self.stream.write(Close::Statement(statement)); + self.stream.write(Flush); + self.stream.flush().await?; + } Ok(statement) } diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index 1821e279c0..885ba92f56 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -13,7 +13,7 @@ use crate::ext::ustr::UStr; use crate::io::Decode; use crate::postgres::connection::stream::PgStream; use crate::postgres::message::{ - Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, + Close, Flush, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, }; use crate::postgres::row::PgColumn; use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres}; @@ -126,7 +126,18 @@ impl Connection for PgConnection { fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - self.cache_statement.clear(); + let mut needs_flush = false; + + while let Some(statement) = self.cache_statement.remove_lru() { + self.stream.write(Close::Statement(statement)); + needs_flush = true; + } + + if needs_flush { + self.stream.write(Flush); + self.stream.flush().await?; + } + Ok(()) }) } diff --git a/sqlx-core/src/postgres/message/close.rs b/sqlx-core/src/postgres/message/close.rs new file mode 100644 index 0000000000..07e7795008 --- /dev/null +++ b/sqlx-core/src/postgres/message/close.rs @@ -0,0 +1,32 @@ +use crate::io::Encode; +use crate::postgres::io::PgBufMutExt; + +const CLOSE_PORTAL: u8 = b'P'; +const CLOSE_STATEMENT: u8 = b'S'; + +#[derive(Debug)] +#[allow(dead_code)] +pub enum Close { + Statement(u32), + Portal(u32), +} + +impl Encode<'_> for Close { + fn encode_with(&self, buf: &mut Vec, _: ()) { + // 15 bytes for 1-digit statement/portal IDs + buf.reserve(20); + buf.push(b'C'); + + buf.put_length_prefixed(|buf| match self { + Close::Statement(id) => { + buf.push(CLOSE_STATEMENT); + buf.put_statement_name(*id); + } + + Close::Portal(id) => { + buf.push(CLOSE_PORTAL); + buf.put_portal_name(Some(*id)); + } + }) + } +} diff --git a/sqlx-core/src/postgres/message/mod.rs b/sqlx-core/src/postgres/message/mod.rs index 7cb1eb49ea..87f11feb69 100644 --- a/sqlx-core/src/postgres/message/mod.rs +++ b/sqlx-core/src/postgres/message/mod.rs @@ -6,6 +6,7 @@ use crate::io::Decode; mod authentication; mod backend_key_data; mod bind; +mod close; mod command_complete; mod data_row; mod describe; @@ -28,6 +29,7 @@ mod terminate; pub use authentication::{Authentication, AuthenticationSasl}; pub use backend_key_data::BackendKeyData; pub use bind::Bind; +pub use close::Close; pub use command_complete::CommandComplete; pub use data_row::DataRow; pub use describe::Describe; From 363dbfb81e07c3155c2f943c2f72e98d824ce914 Mon Sep 17 00:00:00 2001 From: Julius de Bruijn Date: Thu, 25 Jun 2020 12:38:20 +0200 Subject: [PATCH 7/7] No need to reset the statement when dropping. --- sqlx-core/src/sqlite/connection/executor.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sqlx-core/src/sqlite/connection/executor.rs b/sqlx-core/src/sqlite/connection/executor.rs index e908117ec8..8bd384a609 100644 --- a/sqlx-core/src/sqlite/connection/executor.rs +++ b/sqlx-core/src/sqlite/connection/executor.rs @@ -29,10 +29,7 @@ fn prepare<'a>( if !statements.contains_key(query) { let statement = SqliteStatement::prepare(conn, query, false)?; - - if let Some(mut statement) = statements.insert(query, statement) { - statement.reset(); - } + statements.insert(query, statement); } let statement = statements.get_mut(query).unwrap();