From b1acf6cb8dca13a29c3fef01fbe2f689b0f2f612 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 15:59:36 +0000 Subject: [PATCH 01/20] feat(storage): use mdbx_txn_reset to kill timeouted transactions --- .../storage/db/src/implementation/mdbx/mod.rs | 10 +- .../storage/db/src/implementation/mdbx/tx.rs | 22 +-- crates/storage/libmdbx-rs/benches/cursor.rs | 38 ++--- crates/storage/libmdbx-rs/src/cursor.rs | 79 +++++----- crates/storage/libmdbx-rs/src/database.rs | 10 +- crates/storage/libmdbx-rs/src/error.rs | 27 ---- crates/storage/libmdbx-rs/src/transaction.rs | 100 ++++++------- crates/storage/libmdbx-rs/src/txn_manager.rs | 140 +++++------------- 8 files changed, 170 insertions(+), 256 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 0dcaec130a3d..d4177001d875 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -96,17 +96,19 @@ impl Database for DatabaseEnv { type TXMut = tx::Tx; fn tx(&self) -> Result { - Ok(Tx::new_with_metrics( + Tx::new_with_metrics( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - )) + ) + .map_err(|e| DatabaseError::InitTx(e.into())) } fn tx_mut(&self) -> Result { - Ok(Tx::new_with_metrics( + Tx::new_with_metrics( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - )) + ) + .map_err(|e| DatabaseError::InitTx(e.into())) } } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 16d38ead6758..656f6dde9a74 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -53,14 +53,16 @@ impl Tx { pub fn new_with_metrics( inner: Transaction, env_metrics: Option>, - ) -> Self { - let metrics_handler = env_metrics.map(|env_metrics| { - let handler = MetricsHandler::::new(inner.id(), env_metrics); - handler.env_metrics.record_opened_transaction(handler.transaction_mode()); - handler.log_transaction_opened(); - handler - }); - Self::new_inner(inner, metrics_handler) + ) -> reth_libmdbx::Result { + let metrics_handler = env_metrics + .map(|env_metrics| { + let handler = MetricsHandler::::new(inner.id()?, env_metrics); + handler.env_metrics.record_opened_transaction(handler.transaction_mode()); + handler.log_transaction_opened(); + Ok(handler) + }) + .transpose()?; + Ok(Self::new_inner(inner, metrics_handler)) } #[inline] @@ -76,8 +78,8 @@ impl Tx { } /// Gets this transaction ID. - pub fn id(&self) -> u64 { - self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id) + pub fn id(&self) -> reth_libmdbx::Result { + self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id)) } /// Gets a table database handle if it exists, otherwise creates it. diff --git a/crates/storage/libmdbx-rs/benches/cursor.rs b/crates/storage/libmdbx-rs/benches/cursor.rs index c05cef4b6ef6..96ef08fd9c8a 100644 --- a/crates/storage/libmdbx-rs/benches/cursor.rs +++ b/crates/storage/libmdbx-rs/benches/cursor.rs @@ -79,28 +79,30 @@ fn bench_get_seq_raw(c: &mut Criterion) { let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi(); let _txn = env.begin_ro_txn().unwrap(); - let txn = _txn.txn(); - let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - let mut cursor: *mut MDBX_cursor = ptr::null_mut(); + _txn.txn_execute(|txn| { + let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + let mut cursor: *mut MDBX_cursor = ptr::null_mut(); - c.bench_function("bench_get_seq_raw", |b| { - b.iter(|| unsafe { - mdbx_cursor_open(txn, dbi, &mut cursor); - let mut i = 0; - let mut count = 0u32; + c.bench_function("bench_get_seq_raw", |b| { + b.iter(|| unsafe { + mdbx_cursor_open(txn, dbi, &mut cursor); + let mut i = 0; + let mut count = 0u32; - while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { - i += key.iov_len + data.iov_len; - count += 1; - } + while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { + i += key.iov_len + data.iov_len; + count += 1; + } - black_box(i); - assert_eq!(count, n); - mdbx_cursor_close(cursor); - }) - }); + black_box(i); + assert_eq!(count, n); + mdbx_cursor_close(cursor); + }) + }); + }) + .unwrap(); } criterion_group! { diff --git a/crates/storage/libmdbx-rs/src/cursor.rs b/crates/storage/libmdbx-rs/src/cursor.rs index 1f356a7e5579..d9b1c5c42f01 100644 --- a/crates/storage/libmdbx-rs/src/cursor.rs +++ b/crates/storage/libmdbx-rs/src/cursor.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result}, + error::{mdbx_result, Error, Result}, flags::*, mdbx_try_optional, transaction::{TransactionKind, RW}, @@ -30,26 +30,26 @@ where pub(crate) fn new(txn: Transaction, dbi: ffi::MDBX_dbi) -> Result { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); unsafe { - mdbx_result_with_tx_kind::( - txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)), - txn.txn(), - txn.env().txn_manager(), - )?; + txn.txn_execute(|txn_ptr| { + mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor)) + })??; } Ok(Self { txn, cursor }) } fn new_at_position(other: &Self) -> Result { unsafe { - let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); + other.txn.txn_execute(|_| { + let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); - let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); - let s = Self { txn: other.txn.clone(), cursor }; + let s = Self { txn: other.txn.clone(), cursor }; - mdbx_result_with_tx_kind::(res, s.txn.txn(), s.txn.env().txn_manager())?; + mdbx_result(res)?; - Ok(s) + Ok(s) + })? } } @@ -95,11 +95,12 @@ where let key_ptr = key_val.iov_base; let data_ptr = data_val.iov_base; self.txn.txn_execute(|txn| { - let v = mdbx_result_with_tx_kind::( - ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op), - txn, - self.txn.env().txn_manager(), - )?; + let v = mdbx_result(ffi::mdbx_cursor_get( + self.cursor, + &mut key_val, + &mut data_val, + op, + ))?; assert_ne!(data_ptr, data_val.iov_base); let key_out = { // MDBX wrote in new key @@ -111,7 +112,7 @@ where }; let data_out = Value::decode_val::(txn, data_val)?; Ok((key_out, data_out, v)) - }) + })? } } @@ -444,7 +445,7 @@ impl Cursor { mdbx_result(unsafe { self.txn.txn_execute(|_| { ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits()) - }) + })? })?; Ok(()) @@ -458,7 +459,7 @@ impl Cursor { /// current key, if the database was opened with [DatabaseFlags::DUP_SORT]. pub fn del(&mut self, flags: WriteFlags) -> Result<()> { mdbx_result(unsafe { - self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits())) + self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))? })?; Ok(()) @@ -470,7 +471,7 @@ where K: TransactionKind, { fn clone(&self) -> Self { - self.txn.txn_execute(|_| Self::new_at_position(self).unwrap()) + Self::new_at_position(self).unwrap() } } @@ -488,7 +489,7 @@ where K: TransactionKind, { fn drop(&mut self) { - self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }) + let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); } } @@ -564,7 +565,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - cursor.txn.txn_execute(|txn| { + let result = cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -583,7 +584,11 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }) + }); + match result { + Ok(result) => result, + Err(err) => Some(Err(err)), + } } } Self::Err(err) => err.take().map(Err), @@ -655,7 +660,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - cursor.txn.txn_execute(|txn| { + let result = cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -674,7 +679,11 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }) + }); + match result { + Ok(result) => result, + Err(err) => Some(Err(err)), + } } } Iter::Err(err) => err.take().map(Err), @@ -752,17 +761,15 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, ffi::MDBX_NEXT_NODUP); - cursor.txn.txn_execute(|_| { - let err_code = - unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; - - (err_code == ffi::MDBX_SUCCESS).then(|| { - IntoIter::new( - Cursor::new_at_position(&**cursor).unwrap(), - ffi::MDBX_GET_CURRENT, - ffi::MDBX_NEXT_DUP, - ) - }) + let err_code = + unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; + + (err_code == ffi::MDBX_SUCCESS).then(|| { + IntoIter::new( + Cursor::new_at_position(&**cursor).unwrap(), + ffi::MDBX_GET_CURRENT, + ffi::MDBX_NEXT_DUP, + ) }) } IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))), diff --git a/crates/storage/libmdbx-rs/src/database.rs b/crates/storage/libmdbx-rs/src/database.rs index 09e2e10890bf..55eb7e0bbf57 100644 --- a/crates/storage/libmdbx-rs/src/database.rs +++ b/crates/storage/libmdbx-rs/src/database.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result_with_tx_kind, Result}, + error::{mdbx_result, Result}, transaction::TransactionKind, Environment, Transaction, }; @@ -31,12 +31,8 @@ impl Database { let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() }; let mut dbi: ffi::MDBX_dbi = 0; txn.txn_execute(|txn_ptr| { - mdbx_result_with_tx_kind::( - unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }, - txn_ptr, - txn.env().txn_manager(), - ) - })?; + mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }) + })??; Ok(Self::new_from_ptr(dbi, txn.env().clone())) } diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index aff584619509..d558c344eded 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -216,33 +216,6 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result { } } -#[cfg(feature = "read-tx-timeouts")] -#[inline] -pub(crate) fn mdbx_result_with_tx_kind( - err_code: c_int, - txn: *mut ffi::MDBX_txn, - txn_manager: &TxnManager, -) -> Result { - if K::IS_READ_ONLY && - err_code == ffi::MDBX_EBADSIGN && - txn_manager.remove_aborted_read_transaction(txn).is_some() - { - return Err(Error::ReadTransactionAborted) - } - - mdbx_result(err_code) -} - -#[cfg(not(feature = "read-tx-timeouts"))] -#[inline] -pub(crate) fn mdbx_result_with_tx_kind( - err_code: c_int, - _txn: *mut ffi::MDBX_txn, - _txn_manager: &TxnManager, -) -> Result { - mdbx_result(err_code) -} - #[macro_export] macro_rules! mdbx_try_optional { ($expr:expr) => {{ diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 9910d9dca63a..53e5b94e9774 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -1,7 +1,7 @@ use crate::{ database::Database, environment::Environment, - error::{mdbx_result, mdbx_result_with_tx_kind, Result}, + error::{mdbx_result, Result}, flags::{DatabaseFlags, WriteFlags}, txn_manager::{TxnManagerMessage, TxnPtr}, Cursor, Error, Stat, TableObject, @@ -71,29 +71,27 @@ where pub(crate) fn new(env: Environment) -> Result { let mut txn: *mut ffi::MDBX_txn = ptr::null_mut(); unsafe { - mdbx_result_with_tx_kind::( - ffi::mdbx_txn_begin_ex( - env.env_ptr(), - ptr::null_mut(), - K::OPEN_FLAGS, - &mut txn, - ptr::null_mut(), - ), - txn, - env.txn_manager(), - )?; + mdbx_result(ffi::mdbx_txn_begin_ex( + env.env_ptr(), + ptr::null_mut(), + K::OPEN_FLAGS, + &mut txn, + ptr::null_mut(), + ))?; Ok(Self::new_from_ptr(env, txn)) } } - pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self { + pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self { + let txn = TransactionPtr::new(txn_ptr); + #[cfg(feature = "read-tx-timeouts")] if K::IS_READ_ONLY { - env.txn_manager().add_active_read_transaction(txn) + env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone()) } let inner = TransactionInner { - txn: TransactionPtr::new(txn), + txn, primed_dbis: Mutex::new(IndexSet::new()), committed: AtomicBool::new(false), env, @@ -108,7 +106,7 @@ where /// The caller **must** ensure that the pointer is not used after the /// lifetime of the transaction. #[inline] - pub(crate) fn txn_execute(&self, f: F) -> T + pub fn txn_execute(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { @@ -117,6 +115,7 @@ where /// Returns a copy of the raw pointer to the underlying MDBX transaction. #[doc(hidden)] + #[cfg(test)] pub fn txn(&self) -> *mut ffi::MDBX_txn { self.inner.txn.txn } @@ -142,7 +141,7 @@ where } /// Returns the transaction id. - pub fn id(&self) -> u64 { + pub fn id(&self) -> Result { self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) }) } @@ -168,7 +167,7 @@ where ffi::MDBX_NOTFOUND => Ok(None), err_code => Err(Error::from_err_code(err_code)), } - }) + })? } /// Commits the transaction. @@ -191,11 +190,9 @@ where self.env().txn_manager().remove_active_read_transaction(txn); let mut latency = CommitLatency::new(); - mdbx_result_with_tx_kind::( - unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) }, - txn, - self.env().txn_manager(), - ) + mdbx_result(unsafe { + ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) + }) .map(|v| (v, latency)) } else { let (sender, rx) = sync_channel(0); @@ -204,7 +201,7 @@ where .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender }); rx.recv().unwrap() } - }); + })?; self.inner.set_committed(); result @@ -243,12 +240,8 @@ where let mut flags: c_uint = 0; unsafe { self.txn_execute(|txn| { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()), - txn, - self.env().txn_manager(), - ) - })?; + mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut())) + })??; } // The types are not the same on Windows. Great! @@ -266,12 +259,8 @@ where unsafe { let mut stat = Stat::new(); self.txn_execute(|txn| { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::()), - txn, - self.env().txn_manager(), - ) - })?; + mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::())) + })??; Ok(stat) } } @@ -290,7 +279,7 @@ where #[cfg(feature = "read-tx-timeouts")] pub fn disable_timeout(&self) { if K::IS_READ_ONLY { - self.env().txn_manager().remove_active_read_transaction(self.txn()); + self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn); } } } @@ -342,7 +331,7 @@ where } #[inline] - fn txn_execute(&self, f: F) -> T + fn txn_execute(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { @@ -355,14 +344,14 @@ where K: TransactionKind, { fn drop(&mut self) { - self.txn_execute(|txn| { + let _ = self.txn_execute(|txn| { if !self.has_committed() { if K::IS_READ_ONLY { #[cfg(feature = "read-tx-timeouts")] self.env.txn_manager().remove_active_read_transaction(txn); unsafe { - ffi::mdbx_txn_abort(txn); + let _ = mdbx_result(ffi::mdbx_txn_abort(txn)); } } else { let (sender, rx) = sync_channel(0); @@ -372,7 +361,7 @@ where rx.recv().unwrap().unwrap(); } } - }) + }); } } @@ -418,7 +407,7 @@ impl Transaction { ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits()) - }))?; + })?)?; Ok(()) } @@ -447,7 +436,7 @@ impl Transaction { &mut data_val, flags.bits() | ffi::MDBX_RESERVE, ) - }))?; + })?)?; Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len)) } } @@ -482,7 +471,7 @@ impl Transaction { } else { unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) } } - }) + })? }) .map(|_| true) .or_else(|e| match e { @@ -493,7 +482,7 @@ impl Transaction { /// Empties the given database. All items will be removed. pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> { - mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?; + mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?; Ok(()) } @@ -504,7 +493,7 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn drop_db(&self, db: Database) -> Result<()> { - mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?; + mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?; Ok(()) } @@ -517,11 +506,7 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn close_db(&self, db: Database) -> Result<()> { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()), - self.txn(), - self.env().txn_manager(), - )?; + self.txn_execute(|_| mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi())))??; Ok(()) } @@ -542,12 +527,12 @@ impl Transaction { }); rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0)) - }) + })? } } /// A shareable pointer to an MDBX transaction. -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct TransactionPtr { txn: *mut ffi::MDBX_txn, lock: Arc>, @@ -560,12 +545,17 @@ impl TransactionPtr { /// Executes the given closure once the lock on the transaction is acquired. #[inline] - pub(crate) fn txn_execute(&self, f: F) -> T + pub(crate) fn txn_execute(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { let _lck = self.lock.lock(); - (f)(self.txn) + + if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 { + return Err(Error::ReadTransactionAborted) + } + + Ok((f)(self.txn)) } } diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index cc00b7111f11..bb969f267b14 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -77,17 +77,6 @@ impl TxnManager { .map(|_| TxnPtr(txn)), ) .unwrap(); - - #[cfg(feature = "read-tx-timeouts")] - { - use crate::transaction::TransactionKind; - - if flags == crate::transaction::RO::OPEN_FLAGS { - if let Some(read_transactions) = &read_transactions { - read_transactions.add_active(txn); - } - } - } } TxnManagerMessage::Abort { tx, sender } => { #[cfg(feature = "read-tx-timeouts")] @@ -127,7 +116,10 @@ impl TxnManager { #[cfg(feature = "read-tx-timeouts")] mod read_transactions { - use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error}; + use crate::{ + environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, + txn_manager::TxnManager, Error, + }; use dashmap::{DashMap, DashSet}; use std::{ sync::{mpsc::sync_channel, Arc}, @@ -157,9 +149,13 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) { + pub(crate) fn add_active_read_transaction( + &self, + ptr: *mut ffi::MDBX_txn, + tx: TransactionPtr, + ) { if let Some(read_transactions) = &self.read_transactions { - read_transactions.add_active(ptr); + read_transactions.add_active(ptr, tx); } } @@ -167,17 +163,9 @@ mod read_transactions { pub(crate) fn remove_active_read_transaction( &self, ptr: *mut ffi::MDBX_txn, - ) -> Option<(usize, Instant)> { + ) -> Option<(usize, (TransactionPtr, Instant))> { self.read_transactions.as_ref()?.remove_active(ptr) } - - /// Removes a transaction from the list of aborted read transactions. - pub(crate) fn remove_aborted_read_transaction( - &self, - ptr: *mut ffi::MDBX_txn, - ) -> Option { - self.read_transactions.as_ref()?.remove_aborted(ptr) - } } #[derive(Debug, Default)] @@ -189,13 +177,7 @@ mod read_transactions { /// /// We store `usize` instead of a raw pointer as a key, because pointers are not /// comparable. The time of transaction opening is stored as a value. - active: DashMap, - /// List of read transactions aborted by the [ReadTransactions::start_monitor]. - /// We keep them until user tries to abort the transaction, so we're able to report a nice - /// [Error::ReadTransactionAborted] error. - /// - /// We store `usize` instead of a raw pointer, because pointers are not comparable. - aborted: DashSet, + active: DashMap, } impl ReadTransactions { @@ -204,32 +186,21 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) { - let _ = self.active.insert(ptr as usize, Instant::now()); + pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) { + let _ = self.active.insert(ptr as usize, (tx, Instant::now())); } /// Removes a transaction from the list of active read transactions. - pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> { + pub(super) fn remove_active( + &self, + ptr: *mut ffi::MDBX_txn, + ) -> Option<(usize, (TransactionPtr, Instant))> { self.active.remove(&(ptr as usize)) } - /// Adds a new transaction to the list of aborted read transactions. - pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) { - self.aborted.insert(ptr as usize); - } - - /// Removes a transaction from the list of aborted read transactions. - pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option { - self.aborted.remove(&(ptr as usize)) - } - /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read /// transactions and aborts those that are open for longer than /// `ReadTransactions.max_duration`. - /// - /// Aborted transaction pointers are placed into the list of aborted read transactions, and - /// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries - /// to use it. pub(super) fn start_monitor(self: Arc) { std::thread::spawn(move || { let mut aborted_active = Vec::new(); @@ -241,22 +212,27 @@ mod read_transactions { // Iterate through active read transactions and abort those that's open for // longer than `self.max_duration`. for entry in self.active.iter() { - let (ptr, start) = entry.pair(); + let (tx, start) = entry.value(); let duration = now - *start; if duration > self.max_duration { - let ptr = *ptr as *mut ffi::MDBX_txn; - - // Add the transaction to the list of aborted transactions, so further - // usages report the correct error when the transaction is closed. - self.add_aborted(ptr); - - // Abort the transaction - let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) }); - - // Add the transaction to `aborted_active`. We can't remove it instantly - // from the list of active transactions, because we iterate through it. - aborted_active.push((ptr, duration, result.err())); + let result = tx.txn_execute(|txn_ptr| { + // Abort the transaction + let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }); + (txn_ptr, duration, result.err()) + }); + + match result { + Ok((txn_ptr, duration, error)) => { + // Add the transaction to `aborted_active`. We can't remove it + // instantly from the list of active + // transactions, because we iterate through it. + aborted_active.push((txn_ptr, duration, error)); + } + Err(err) => { + error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction") + } + } } else { max_active_transaction_duration = Some( duration.max(max_active_transaction_duration.unwrap_or_default()), @@ -270,18 +246,14 @@ mod read_transactions { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); if let Some(err) = err { - // If there was an error when aborting the transaction, we need to - // remove it from the list of aborted transactions, because otherwise it - // will stay there forever. - self.remove_aborted(ptr); if was_in_active && err != Error::BadSignature { // If the transaction was in the list of active transactions and the // error code is not `EBADSIGN`, then user didn't abort it. - error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions"); + error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transaction"); } } else { // Happy path, the transaction has been aborted by us with no errors. - warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted"); + warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been aborted"); } } @@ -289,15 +261,14 @@ mod read_transactions { // capacity to save on further pushes. aborted_active.clear(); - if !self.active.is_empty() || !self.aborted.is_empty() { + if !self.active.is_empty() { trace!( target: "libmdbx", elapsed = ?now.elapsed(), active = ?self.active.iter().map(|entry| { - let (ptr, start) = entry.pair(); - (*ptr, start.elapsed()) + let (tx, start) = entry.value(); + (tx.clone(), start.elapsed()) }).collect::>(), - aborted = ?self.aborted.iter().map(|entry| *entry).collect::>(), "Read transactions" ); } @@ -347,7 +318,6 @@ mod read_transactions { drop(tx); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); } // Create a read-only transaction, successfully use it, close it by committing. @@ -360,7 +330,6 @@ mod read_transactions { tx.commit().unwrap(); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); } // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the @@ -373,11 +342,9 @@ mod read_transactions { sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.aborted.contains(&tx_ptr)); assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted)); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); } } @@ -395,30 +362,5 @@ mod read_transactions { sleep(READ_TRANSACTIONS_CHECK_INTERVAL); assert!(tx.commit().is_ok()) } - - #[test] - fn txn_manager_begin_read_transaction_via_message_listener() { - const MAX_DURATION: Duration = Duration::from_secs(1); - - let dir = tempdir().unwrap(); - let env = Environment::builder() - .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) - .open(dir.path()) - .unwrap(); - - let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); - - // Create a read-only transaction via the message listener. - let (tx, rx) = sync_channel(0); - env.txn_manager().send_message(TxnManagerMessage::Begin { - parent: TxnPtr(ptr::null_mut()), - flags: RO::OPEN_FLAGS, - sender: tx, - }); - - let txn_ptr = rx.recv().unwrap().unwrap(); - - assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize))); - } } } From a618d6181855319e6c36f946661adc3d0b8cdb70 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:08:14 +0000 Subject: [PATCH 02/20] cleanup --- crates/storage/libmdbx-rs/src/error.rs | 1 - crates/storage/libmdbx-rs/src/txn_manager.rs | 15 +-------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index d558c344eded..0efd3b644292 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -1,4 +1,3 @@ -use crate::{txn_manager::TxnManager, TransactionKind}; use libc::c_int; use std::result; diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 2a1a73f3fb02..b65b4204273c 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -52,9 +52,6 @@ impl TxnManager { /// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort] /// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex] fn start_message_listener(&self, env: EnvPtr, rx: Receiver) { - #[cfg(feature = "read-tx-timeouts")] - let read_transactions = self.read_transactions.clone(); - std::thread::spawn(move || { #[allow(clippy::redundant_locals)] let env = env; @@ -76,19 +73,9 @@ impl TxnManager { sender.send(res).unwrap(); } TxnManagerMessage::Abort { tx, sender } => { - #[cfg(feature = "read-tx-timeouts")] - if let Some(read_transactions) = &read_transactions { - read_transactions.remove_active(tx.0); - } - sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap(); } TxnManagerMessage::Commit { tx, sender } => { - #[cfg(feature = "read-tx-timeouts")] - if let Some(read_transactions) = &read_transactions { - read_transactions.remove_active(tx.0); - } - sender .send({ let mut latency = CommitLatency::new(); @@ -117,7 +104,7 @@ mod read_transactions { environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, txn_manager::TxnManager, Error, }; - use dashmap::{DashMap, DashSet}; + use dashmap::DashMap; use std::{ sync::{mpsc::sync_channel, Arc}, time::{Duration, Instant}, From 806a154410500d2b936b6e266ea812359970caa8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:19:42 +0000 Subject: [PATCH 03/20] clarify nits --- crates/storage/libmdbx-rs/src/error.rs | 3 ++- crates/storage/libmdbx-rs/src/transaction.rs | 2 +- crates/storage/libmdbx-rs/src/txn_manager.rs | 17 ++++++++++------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index 0efd3b644292..9416c30a985d 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -192,9 +192,10 @@ impl Error { Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL, Error::Access => ffi::MDBX_EACCESS, Error::TooLarge => ffi::MDBX_TOO_LARGE, - Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN, + Error::BadSignature => ffi::MDBX_EBADSIGN, Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS, Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS, + Error::ReadTransactionAborted => -69000, // Custom non-MDBX error code Error::Other(err_code) => *err_code, } } diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 53e5b94e9774..d78daff09f85 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -351,7 +351,7 @@ where self.env.txn_manager().remove_active_read_transaction(txn); unsafe { - let _ = mdbx_result(ffi::mdbx_txn_abort(txn)); + ffi::mdbx_txn_abort(txn); } } else { let (sender, rx) = sync_channel(0); diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index b65b4204273c..93cc5209f1a5 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -201,9 +201,12 @@ mod read_transactions { if duration > self.max_duration { let result = tx.txn_execute(|txn_ptr| { - // Abort the transaction - let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }); - (txn_ptr, duration, result.err()) + ( + txn_ptr, + duration, + // Abort the transaction + mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }), + ) }); match result { @@ -229,10 +232,10 @@ mod read_transactions { for (ptr, open_duration, err) in aborted_active.iter().copied() { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); - if let Some(err) = err { - if was_in_active && err != Error::BadSignature { - // If the transaction was in the list of active transactions and the - // error code is not `EBADSIGN`, then user didn't abort it. + if let Err(err) = err { + if was_in_active { + // If the transaction was in the list of active transactions then + // user didn't abort it and we failed to do so. error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transaction"); } } else { From 4d8ff94e9069ef59dcbdf0aa5dea7f3f496918a6 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:22:37 +0000 Subject: [PATCH 04/20] remove unused imports --- crates/storage/libmdbx-rs/src/txn_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 93cc5209f1a5..e6ad38910198 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -102,7 +102,7 @@ impl TxnManager { mod read_transactions { use crate::{ environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, - txn_manager::TxnManager, Error, + txn_manager::TxnManager, }; use dashmap::DashMap; use std::{ From 2e8d92fe6f2675ccaa7535804bed49fe7561ffa1 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:25:30 +0000 Subject: [PATCH 05/20] remove similar method for benches --- crates/storage/libmdbx-rs/benches/transaction.rs | 5 +++-- crates/storage/libmdbx-rs/src/transaction.rs | 15 --------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/crates/storage/libmdbx-rs/benches/transaction.rs b/crates/storage/libmdbx-rs/benches/transaction.rs index 91e2c44044c7..dc8426c5bf31 100644 --- a/crates/storage/libmdbx-rs/benches/transaction.rs +++ b/crates/storage/libmdbx-rs/benches/transaction.rs @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) { c.bench_function("bench_get_rand_raw", |b| { b.iter(|| unsafe { - txn.with_raw_tx_ptr(|txn| { + txn.txn_execute(|txn| { let mut i: size_t = 0; for key in &keys { key_val.iov_len = key.len() as size_t; @@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) { i += key_val.iov_len; } black_box(i); - }); + }) + .unwrap(); }) }); } diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index d78daff09f85..9c443ac76565 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -120,21 +120,6 @@ where self.inner.txn.txn } - /// Executes the given closure once - /// - /// This is only intended to be used when accessing mdbx ffi functions directly is required. - /// - /// The caller **must** ensure that the pointer is only used within the closure. - #[inline] - #[doc(hidden)] - pub fn with_raw_tx_ptr(&self, f: F) -> T - where - F: FnOnce(*mut ffi::MDBX_txn) -> T, - { - let _lock = self.inner.txn.lock.lock(); - f(self.inner.txn.txn) - } - /// Returns a raw pointer to the MDBX environment. pub fn env(&self) -> &Environment { &self.inner.env From eaea3eb2b04c3cc4e77d7170905768d957c0c1ef Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:26:59 +0000 Subject: [PATCH 06/20] move txn_execute inside the bench --- crates/storage/libmdbx-rs/benches/cursor.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/storage/libmdbx-rs/benches/cursor.rs b/crates/storage/libmdbx-rs/benches/cursor.rs index 96ef08fd9c8a..a73b4fe275e8 100644 --- a/crates/storage/libmdbx-rs/benches/cursor.rs +++ b/crates/storage/libmdbx-rs/benches/cursor.rs @@ -78,15 +78,15 @@ fn bench_get_seq_raw(c: &mut Criterion) { let (_dir, env) = setup_bench_db(n); let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi(); - let _txn = env.begin_ro_txn().unwrap(); + let txn = env.begin_ro_txn().unwrap(); - _txn.txn_execute(|txn| { - let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - let mut cursor: *mut MDBX_cursor = ptr::null_mut(); + let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + let mut cursor: *mut MDBX_cursor = ptr::null_mut(); - c.bench_function("bench_get_seq_raw", |b| { - b.iter(|| unsafe { + c.bench_function("bench_get_seq_raw", |b| { + b.iter(|| unsafe { + txn.txn_execute(|txn| { mdbx_cursor_open(txn, dbi, &mut cursor); let mut i = 0; let mut count = 0u32; @@ -100,9 +100,9 @@ fn bench_get_seq_raw(c: &mut Criterion) { assert_eq!(count, n); mdbx_cursor_close(cursor); }) - }); - }) - .unwrap(); + .unwrap(); + }) + }); } criterion_group! { From 87b2478ccdf5d614c649a52531da4ecf42f59993 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:31:55 +0000 Subject: [PATCH 07/20] use aborted tx two times --- crates/storage/libmdbx-rs/src/txn_manager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index e6ad38910198..b99ed2fe4a6b 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -318,7 +318,8 @@ mod read_transactions { } // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the - // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. + // manager kills it, use it two times and observe the `Error::ReadTransactionAborted` + // error. { let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize; @@ -330,6 +331,9 @@ mod read_transactions { assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted)); assert!(!read_transactions.active.contains_key(&tx_ptr)); + + assert_eq!(tx.id().err(), Some(Error::ReadTransactionAborted)); + assert!(!read_transactions.active.contains_key(&tx_ptr)); } } From 54bba36875a1277e04c74a05f99f4a676256cb6c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:34:49 +0000 Subject: [PATCH 08/20] ensure that txn pointer isn't reused after being timeouted --- crates/storage/libmdbx-rs/src/txn_manager.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index b99ed2fe4a6b..e1f3632a334f 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -319,7 +319,8 @@ mod read_transactions { // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the // manager kills it, use it two times and observe the `Error::ReadTransactionAborted` - // error. + // error. Also, ensure that the transaction pointer is not reused when opening a new + // read-only transaction. { let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize; @@ -334,6 +335,11 @@ mod read_transactions { assert_eq!(tx.id().err(), Some(Error::ReadTransactionAborted)); assert!(!read_transactions.active.contains_key(&tx_ptr)); + + let tx = env.begin_ro_txn().unwrap(); + let new_tx_ptr = tx.txn() as usize; + assert!(read_transactions.active.contains_key(&new_tx_ptr)); + assert_ne!(tx_ptr, new_tx_ptr); } } From c82da1f5a462f7bc9461461534385cee04e3d90c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 16:44:57 +0000 Subject: [PATCH 09/20] change error code --- crates/storage/libmdbx-rs/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index 9416c30a985d..4d1856b94324 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -195,7 +195,7 @@ impl Error { Error::BadSignature => ffi::MDBX_EBADSIGN, Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS, Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS, - Error::ReadTransactionAborted => -69000, // Custom non-MDBX error code + Error::ReadTransactionAborted => -96000, // Custom non-MDBX error code Error::Other(err_code) => *err_code, } } From 4d7687352b8b1a87f3d89e306b8d9547575fdbb3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 17:09:34 +0000 Subject: [PATCH 10/20] add a comment about why reset --- crates/storage/libmdbx-rs/src/txn_manager.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index e1f3632a334f..22b1cfd2f252 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -204,7 +204,15 @@ mod read_transactions { ( txn_ptr, duration, - // Abort the transaction + // Abort the transaction. + // + // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to + // prevent MDBX from reusing the pointer of the aborted + // tranasction for new read-only transactions. This is + // important because we store the pointer in the `active` list + // and we don't want to accidentally abort a new transaction. + // + // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info. mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }), ) }); From cfd4fea4e03130eb94acd82f93906eeda6ffc17b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 17:18:08 +0000 Subject: [PATCH 11/20] explain flag check --- crates/storage/libmdbx-rs/src/transaction.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 9c443ac76565..49121320d909 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -536,6 +536,8 @@ impl TransactionPtr { { let _lck = self.lock.lock(); + // When transaction is aborted via `TxnManager`, it's actually reset using `mdbn_txn_reset` + // that makes the transaction unusable and sets the `MDBX_TXN_FINISHED` flag. if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 { return Err(Error::ReadTransactionAborted) } From 22109594e9644f957e3f09b37758860bdb2c9625 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 17:20:42 +0000 Subject: [PATCH 12/20] and more explanation --- crates/storage/libmdbx-rs/src/transaction.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 49121320d909..5393ea90f1e5 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -538,6 +538,10 @@ impl TransactionPtr { // When transaction is aborted via `TxnManager`, it's actually reset using `mdbn_txn_reset` // that makes the transaction unusable and sets the `MDBX_TXN_FINISHED` flag. + // + // No race condition with the `TxnManager` aborting our transaction is possible here, + // because we're taking a lock for any actions on the transaction pointer, including a call + // to the `mdbx_txn_reset`. if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 { return Err(Error::ReadTransactionAborted) } From 7b2e7b261c6a8d5baaaf2c6a662e97d383884e76 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 17:33:04 +0000 Subject: [PATCH 13/20] clarify abort comment --- crates/storage/libmdbx-rs/src/txn_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 22b1cfd2f252..fd4d462cf012 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -208,9 +208,9 @@ mod read_transactions { // // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to // prevent MDBX from reusing the pointer of the aborted - // tranasction for new read-only transactions. This is + // transaction for new read-only transactions. This is // important because we store the pointer in the `active` list - // and we don't want to accidentally abort a new transaction. + // and assume that it is unique. // // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info. mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }), From f5db349ee509575ea31f09744fc1e639fafe85e1 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Feb 2024 17:39:33 +0000 Subject: [PATCH 14/20] Update crates/storage/libmdbx-rs/src/transaction.rs Co-authored-by: Emilia Hane --- crates/storage/libmdbx-rs/src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 5393ea90f1e5..8b5cddfd76ba 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -536,7 +536,7 @@ impl TransactionPtr { { let _lck = self.lock.lock(); - // When transaction is aborted via `TxnManager`, it's actually reset using `mdbn_txn_reset` + // When transaction is aborted via `TxnManager`, it's actually reset using `mdbx_txn_reset` // that makes the transaction unusable and sets the `MDBX_TXN_FINISHED` flag. // // No race condition with the `TxnManager` aborting our transaction is possible here, From 507d60c9e678930afa97b37a97de7a16bd7ac4ba Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 12:47:33 +0000 Subject: [PATCH 15/20] add metric --- .../src/metrics/prometheus_exporter.rs | 4 ++++ .../storage/db/src/implementation/mdbx/mod.rs | 6 ++++++ crates/storage/libmdbx-rs/src/environment.rs | 5 +++++ crates/storage/libmdbx-rs/src/txn_manager.rs | 19 ++++++++++++++++++- 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/crates/node-core/src/metrics/prometheus_exporter.rs b/crates/node-core/src/metrics/prometheus_exporter.rs index e1d1e378f988..6abea7fba45d 100644 --- a/crates/node-core/src/metrics/prometheus_exporter.rs +++ b/crates/node-core/src/metrics/prometheus_exporter.rs @@ -102,6 +102,10 @@ where describe_gauge!("db.table_pages", "The number of database pages for a table"); describe_gauge!("db.table_entries", "The number of entries for a table"); describe_gauge!("db.freelist", "The number of pages on the freelist"); + describe_gauge!( + "db.timeouted_not_aborted_transactions", + "Number of timeouted transactions that were not aborted by the user yet" + ); process.describe(); describe_memory_stats(); describe_io_stats(); diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 6ba7b3df80ec..126ad98afa28 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -204,6 +204,12 @@ impl DatabaseMetrics for DatabaseEnv { metrics.push(("db.freelist", freelist as f64, vec![])); } + metrics.push(( + "db.timeouted_not_aborted_transactions", + self.timeouted_not_aborted_transactions() as f64, + vec![], + )); + metrics } } diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index fd1330210082..0ed7822ee80a 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -87,6 +87,11 @@ impl Environment { &self.inner.txn_manager } + #[cfg(feature = "read-tx-timeouts")] + pub fn timeouted_not_aborted_transactions(&self) -> usize { + self.inner.txn_manager.timeouted_not_aborted_read_transactions().unwrap_or(0) + } + /// Create a read-only transaction for use with the environment. #[inline] pub fn begin_ro_txn(&self) -> Result> { diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index fd4d462cf012..afaaccae31da 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -104,7 +104,7 @@ mod read_transactions { environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, txn_manager::TxnManager, }; - use dashmap::DashMap; + use dashmap::{DashMap, DashSet}; use std::{ sync::{mpsc::sync_channel, Arc}, time::{Duration, Instant}, @@ -150,6 +150,13 @@ mod read_transactions { ) -> Option<(usize, (TransactionPtr, Instant))> { self.read_transactions.as_ref()?.remove_active(ptr) } + + /// Returns the number of timeouted transactions that were not aborted by the user yet. + pub(crate) fn timeouted_not_aborted_read_transactions(&self) -> Option { + self.read_transactions + .as_ref() + .map(|read_transactions| read_transactions.timeouted_not_aborted()) + } } #[derive(Debug, Default)] @@ -162,6 +169,9 @@ mod read_transactions { /// We store `usize` instead of a raw pointer as a key, because pointers are not /// comparable. The time of transaction opening is stored as a value. active: DashMap, + /// List of timeouted transactions that were not aborted by the user yet, hence have a + /// dangling read transaction pointer. + timeouted_not_aborted: DashSet, } impl ReadTransactions { @@ -179,9 +189,15 @@ mod read_transactions { &self, ptr: *mut ffi::MDBX_txn, ) -> Option<(usize, (TransactionPtr, Instant))> { + self.timeouted_not_aborted.remove(&(ptr as usize)); self.active.remove(&(ptr as usize)) } + /// Returns the number of timeouted transactions that were not aborted by the user yet. + pub(super) fn timeouted_not_aborted(&self) -> usize { + self.timeouted_not_aborted.len() + } + /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read /// transactions and aborts those that are open for longer than /// `ReadTransactions.max_duration`. @@ -249,6 +265,7 @@ mod read_transactions { } else { // Happy path, the transaction has been aborted by us with no errors. warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been aborted"); + self.timeouted_not_aborted.insert(ptr as usize); } } From 2115f6b105b875ae5c69492f35b10bd90f332f73 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 12:50:06 +0000 Subject: [PATCH 16/20] add a comment --- crates/storage/libmdbx-rs/src/txn_manager.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index afaaccae31da..1f3b207305ad 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -265,6 +265,8 @@ mod read_transactions { } else { // Happy path, the transaction has been aborted by us with no errors. warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been aborted"); + // Add transaction to the list of timeouted transactions that were not + // aborted by the user yet. self.timeouted_not_aborted.insert(ptr as usize); } } From 6e873785c10a0af72f3cf732f74e5ade69e57501 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 13:22:30 +0000 Subject: [PATCH 17/20] abort -> timeout --- .../storage/db/src/implementation/mdbx/tx.rs | 2 +- crates/storage/libmdbx-rs/src/error.rs | 6 ++-- crates/storage/libmdbx-rs/src/transaction.rs | 9 +++--- crates/storage/libmdbx-rs/src/txn_manager.rs | 32 +++++++++---------- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 656f6dde9a74..798b0b3c0208 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -439,7 +439,7 @@ mod tests { assert_eq!( tx.get::(0).err(), - Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into())) + Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into())) ); // Transaction is timeout-ed assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is recorded diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index 4d1856b94324..b43541507080 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -117,8 +117,8 @@ pub enum Error { /// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened. #[error("write transactions are not supported in read-only mode")] WriteTransactionUnsupportedInReadOnlyMode, - #[error("read transaction has been aborted by the transaction manager")] - ReadTransactionAborted, + #[error("read transaction has been timeouted")] + ReadTransactionTimeout, /// Unknown error code. #[error("unknown error code")] Other(i32), @@ -195,7 +195,7 @@ impl Error { Error::BadSignature => ffi::MDBX_EBADSIGN, Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS, Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS, - Error::ReadTransactionAborted => -96000, // Custom non-MDBX error code + Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code Error::Other(err_code) => *err_code, } } diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 8b5cddfd76ba..630bdc0a3673 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -536,14 +536,15 @@ impl TransactionPtr { { let _lck = self.lock.lock(); - // When transaction is aborted via `TxnManager`, it's actually reset using `mdbx_txn_reset` - // that makes the transaction unusable and sets the `MDBX_TXN_FINISHED` flag. + // When transaction is timeouted via `TxnManager`, it's actually reset using + // `mdbx_txn_reset` that makes the transaction unusable and sets the + // `MDBX_TXN_FINISHED` flag. // - // No race condition with the `TxnManager` aborting our transaction is possible here, + // No race condition with the `TxnManager` timeouting our transaction is possible here, // because we're taking a lock for any actions on the transaction pointer, including a call // to the `mdbx_txn_reset`. if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 { - return Err(Error::ReadTransactionAborted) + return Err(Error::ReadTransactionTimeout) } Ok((f)(self.txn)) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 1f3b207305ad..ffab8e1dfa30 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -199,11 +199,11 @@ mod read_transactions { } /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read - /// transactions and aborts those that are open for longer than + /// transactions and timeouts those that are open for longer than /// `ReadTransactions.max_duration`. pub(super) fn start_monitor(self: Arc) { std::thread::spawn(move || { - let mut aborted_active = Vec::new(); + let mut timeouted_active = Vec::new(); loop { let now = Instant::now(); @@ -220,7 +220,7 @@ mod read_transactions { ( txn_ptr, duration, - // Abort the transaction. + // Timeout the transaction. // // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to // prevent MDBX from reusing the pointer of the aborted @@ -238,7 +238,7 @@ mod read_transactions { // Add the transaction to `aborted_active`. We can't remove it // instantly from the list of active // transactions, because we iterate through it. - aborted_active.push((txn_ptr, duration, error)); + timeouted_active.push((txn_ptr, duration, error)); } Err(err) => { error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction") @@ -251,29 +251,29 @@ mod read_transactions { } } - // Walk through aborted transactions, and delete them from the list of active + // Walk through timeouted transactions, and delete them from the list of active // transactions. - for (ptr, open_duration, err) in aborted_active.iter().copied() { + for (ptr, open_duration, err) in timeouted_active.iter().copied() { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); if let Err(err) = err { if was_in_active { - // If the transaction was in the list of active transactions then - // user didn't abort it and we failed to do so. - error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transaction"); + // If the transaction was in the list of active transactions, + // then user didn't abort it and we failed to do so. + error!(target: "libmdbx", %err, ?open_duration, "Failed to timeout the long-lived read transaction"); } } else { - // Happy path, the transaction has been aborted by us with no errors. - warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been aborted"); + // Happy path, the transaction has been timeouted by us with no errors. + warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timeouted"); // Add transaction to the list of timeouted transactions that were not // aborted by the user yet. self.timeouted_not_aborted.insert(ptr as usize); } } - // Clear the list of aborted transactions, but not de-allocate the reserved + // Clear the list of timeouted transactions, but not de-allocate the reserved // capacity to save on further pushes. - aborted_active.clear(); + timeouted_active.clear(); if !self.active.is_empty() { trace!( @@ -345,7 +345,7 @@ mod read_transactions { } // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the - // manager kills it, use it two times and observe the `Error::ReadTransactionAborted` + // manager kills it, use it two times and observe the `Error::ReadTransactionTimeout` // error. Also, ensure that the transaction pointer is not reused when opening a new // read-only transaction. { @@ -357,10 +357,10 @@ mod read_transactions { assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted)); + assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert_eq!(tx.id().err(), Some(Error::ReadTransactionAborted)); + assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); let tx = env.begin_ro_txn().unwrap(); From 3bf4fa354d16c82d6fcf4cae1d543c290c4b9b55 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 13:32:14 +0000 Subject: [PATCH 18/20] timeout -> time out, etc. --- .../src/metrics/prometheus_exporter.rs | 4 +- .../storage/db/src/implementation/mdbx/mod.rs | 4 +- crates/storage/libmdbx-rs/src/environment.rs | 5 +- crates/storage/libmdbx-rs/src/error.rs | 3 +- crates/storage/libmdbx-rs/src/transaction.rs | 2 +- crates/storage/libmdbx-rs/src/txn_manager.rs | 50 +++++++++---------- 6 files changed, 35 insertions(+), 33 deletions(-) diff --git a/crates/node-core/src/metrics/prometheus_exporter.rs b/crates/node-core/src/metrics/prometheus_exporter.rs index 6abea7fba45d..4bfcddf0ca0f 100644 --- a/crates/node-core/src/metrics/prometheus_exporter.rs +++ b/crates/node-core/src/metrics/prometheus_exporter.rs @@ -103,8 +103,8 @@ where describe_gauge!("db.table_entries", "The number of entries for a table"); describe_gauge!("db.freelist", "The number of pages on the freelist"); describe_gauge!( - "db.timeouted_not_aborted_transactions", - "Number of timeouted transactions that were not aborted by the user yet" + "db.timed_out_not_aborted_transactions", + "Number of timed out transactions that were not aborted by the user yet" ); process.describe(); describe_memory_stats(); diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 126ad98afa28..e6a9df92187d 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -205,8 +205,8 @@ impl DatabaseMetrics for DatabaseEnv { } metrics.push(( - "db.timeouted_not_aborted_transactions", - self.timeouted_not_aborted_transactions() as f64, + "db.timed_out_not_aborted_transactions", + self.timed_out_not_aborted_transactions() as f64, vec![], )); diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index 0ed7822ee80a..fa24e2083511 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -87,9 +87,10 @@ impl Environment { &self.inner.txn_manager } + /// Returns the number of timed out transactions that were not aborted by the user yet. #[cfg(feature = "read-tx-timeouts")] - pub fn timeouted_not_aborted_transactions(&self) -> usize { - self.inner.txn_manager.timeouted_not_aborted_read_transactions().unwrap_or(0) + pub fn timed_out_not_aborted_transactions(&self) -> usize { + self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0) } /// Create a read-only transaction for use with the environment. diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index b43541507080..84a6ef361747 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -117,7 +117,8 @@ pub enum Error { /// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened. #[error("write transactions are not supported in read-only mode")] WriteTransactionUnsupportedInReadOnlyMode, - #[error("read transaction has been timeouted")] + /// Read transaction has been timed out. + #[error("read transaction has been timed out")] ReadTransactionTimeout, /// Unknown error code. #[error("unknown error code")] diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 630bdc0a3673..41e5c7387ea5 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -536,7 +536,7 @@ impl TransactionPtr { { let _lck = self.lock.lock(); - // When transaction is timeouted via `TxnManager`, it's actually reset using + // When transaction is timed out via `TxnManager`, it's actually reset using // `mdbx_txn_reset` that makes the transaction unusable and sets the // `MDBX_TXN_FINISHED` flag. // diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index ffab8e1dfa30..34ff9777dd6a 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -151,11 +151,11 @@ mod read_transactions { self.read_transactions.as_ref()?.remove_active(ptr) } - /// Returns the number of timeouted transactions that were not aborted by the user yet. - pub(crate) fn timeouted_not_aborted_read_transactions(&self) -> Option { + /// Returns the number of timed out transactions that were not aborted by the user yet. + pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option { self.read_transactions .as_ref() - .map(|read_transactions| read_transactions.timeouted_not_aborted()) + .map(|read_transactions| read_transactions.timed_out_not_aborted()) } } @@ -169,9 +169,9 @@ mod read_transactions { /// We store `usize` instead of a raw pointer as a key, because pointers are not /// comparable. The time of transaction opening is stored as a value. active: DashMap, - /// List of timeouted transactions that were not aborted by the user yet, hence have a + /// List of timed out transactions that were not aborted by the user yet, hence have a /// dangling read transaction pointer. - timeouted_not_aborted: DashSet, + timed_out_not_aborted: DashSet, } impl ReadTransactions { @@ -189,13 +189,13 @@ mod read_transactions { &self, ptr: *mut ffi::MDBX_txn, ) -> Option<(usize, (TransactionPtr, Instant))> { - self.timeouted_not_aborted.remove(&(ptr as usize)); + self.timed_out_not_aborted.remove(&(ptr as usize)); self.active.remove(&(ptr as usize)) } - /// Returns the number of timeouted transactions that were not aborted by the user yet. - pub(super) fn timeouted_not_aborted(&self) -> usize { - self.timeouted_not_aborted.len() + /// Returns the number of timed out transactions that were not aborted by the user yet. + pub(super) fn timed_out_not_aborted(&self) -> usize { + self.timed_out_not_aborted.len() } /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read @@ -203,13 +203,13 @@ mod read_transactions { /// `ReadTransactions.max_duration`. pub(super) fn start_monitor(self: Arc) { std::thread::spawn(move || { - let mut timeouted_active = Vec::new(); + let mut timed_out_active = Vec::new(); loop { let now = Instant::now(); let mut max_active_transaction_duration = None; - // Iterate through active read transactions and abort those that's open for + // Iterate through active read transactions and time out those that's open for // longer than `self.max_duration`. for entry in self.active.iter() { let (tx, start) = entry.value(); @@ -220,7 +220,7 @@ mod read_transactions { ( txn_ptr, duration, - // Timeout the transaction. + // Time out the transaction. // // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to // prevent MDBX from reusing the pointer of the aborted @@ -235,10 +235,10 @@ mod read_transactions { match result { Ok((txn_ptr, duration, error)) => { - // Add the transaction to `aborted_active`. We can't remove it - // instantly from the list of active - // transactions, because we iterate through it. - timeouted_active.push((txn_ptr, duration, error)); + // Add the transaction to `timed_out_active`. We can't remove it + // instantly from the list of active transactions, because we + // iterate through it. + timed_out_active.push((txn_ptr, duration, error)); } Err(err) => { error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction") @@ -251,29 +251,29 @@ mod read_transactions { } } - // Walk through timeouted transactions, and delete them from the list of active + // Walk through timed out transactions, and delete them from the list of active // transactions. - for (ptr, open_duration, err) in timeouted_active.iter().copied() { + for (ptr, open_duration, err) in timed_out_active.iter().copied() { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); if let Err(err) = err { if was_in_active { // If the transaction was in the list of active transactions, // then user didn't abort it and we failed to do so. - error!(target: "libmdbx", %err, ?open_duration, "Failed to timeout the long-lived read transaction"); + error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction"); } } else { - // Happy path, the transaction has been timeouted by us with no errors. - warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timeouted"); - // Add transaction to the list of timeouted transactions that were not + // Happy path, the transaction has been timed out by us with no errors. + warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out"); + // Add transaction to the list of timed out transactions that were not // aborted by the user yet. - self.timeouted_not_aborted.insert(ptr as usize); + self.timed_out_not_aborted.insert(ptr as usize); } } - // Clear the list of timeouted transactions, but not de-allocate the reserved + // Clear the list of timed out transactions, but not de-allocate the reserved // capacity to save on further pushes. - timeouted_active.clear(); + timed_out_active.clear(); if !self.active.is_empty() { trace!( From 533f6a262f2b9dc418f535c71ba0378326ce6412 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 18:42:06 +0000 Subject: [PATCH 19/20] fix dropping, add more tests and comments --- crates/storage/libmdbx-rs/src/transaction.rs | 48 ++++++++++++++++---- crates/storage/libmdbx-rs/src/txn_manager.rs | 26 ++++++++--- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 41e5c7387ea5..a819f8b9ffe2 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -6,7 +6,7 @@ use crate::{ txn_manager::{TxnManagerMessage, TxnPtr}, Cursor, Error, Stat, TableObject, }; -use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; +use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; use indexmap::IndexSet; use libc::{c_uint, c_void}; use parking_lot::Mutex; @@ -320,7 +320,7 @@ where where F: FnOnce(*mut ffi::MDBX_txn) -> T, { - self.txn.txn_execute(f) + self.txn.txn_execute_fail_on_timeout(f) } } @@ -329,7 +329,9 @@ where K: TransactionKind, { fn drop(&mut self) { - let _ = self.txn_execute(|txn| { + // To be able to abort a timed out transaction, we need to renew it first. + // Hence the usage of `txn_execute_renew_on_timeout` here. + let _ = self.txn.txn_execute_renew_on_timeout(|txn| { if !self.has_committed() { if K::IS_READ_ONLY { #[cfg(feature = "read-tx-timeouts")] @@ -528,27 +530,53 @@ impl TransactionPtr { Self { txn, lock: Arc::new(Mutex::new(())) } } + // Returns `true` if the transaction is timed out. + // + // When transaction is timed out via `TxnManager`, it's actually reset using + // `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such + // transactions), and sets the `MDBX_TXN_FINISHED` flag. + fn is_timed_out(&self) -> bool { + (unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0 + } + /// Executes the given closure once the lock on the transaction is acquired. + /// + /// Returns the result of the closure or an error if the transaction is timed out. #[inline] - pub(crate) fn txn_execute(&self, f: F) -> Result + pub(crate) fn txn_execute_fail_on_timeout(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { let _lck = self.lock.lock(); - // When transaction is timed out via `TxnManager`, it's actually reset using - // `mdbx_txn_reset` that makes the transaction unusable and sets the - // `MDBX_TXN_FINISHED` flag. - // - // No race condition with the `TxnManager` timeouting our transaction is possible here, + // No race condition with the `TxnManager` timing out the transaction is possible here, // because we're taking a lock for any actions on the transaction pointer, including a call // to the `mdbx_txn_reset`. - if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 { + if self.is_timed_out() { return Err(Error::ReadTransactionTimeout) } Ok((f)(self.txn)) } + + /// Executes the given closure once the lock on the transaction is acquired. If the tranasction + /// is timed out, it will be renewed first. + /// + /// Returns the result of the closure or an error if the transaction renewal fails. + #[inline] + fn txn_execute_renew_on_timeout(&self, f: F) -> Result + where + F: FnOnce(*mut ffi::MDBX_txn) -> T, + { + let _lck = self.lock.lock(); + + // To be able to do any operations on the transaction, we need to renew it first. + if self.is_timed_out() { + mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?; + } + + Ok((f)(self.txn)) + } } /// Commit latencies info. diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 34ff9777dd6a..02052405d5e9 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -216,7 +216,7 @@ mod read_transactions { let duration = now - *start; if duration > self.max_duration { - let result = tx.txn_execute(|txn_ptr| { + let result = tx.txn_execute_fail_on_timeout(|txn_ptr| { ( txn_ptr, duration, @@ -344,29 +344,41 @@ mod read_transactions { assert!(!read_transactions.active.contains_key(&tx_ptr)); } - // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the - // manager kills it, use it two times and observe the `Error::ReadTransactionTimeout` - // error. Also, ensure that the transaction pointer is not reused when opening a new - // read-only transaction. { + // Create a read-only transaction and observe it's in the liist of active + // transactions. let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize; assert!(read_transactions.active.contains_key(&tx_ptr)); + // Wait until the transaction is timed out by the manager. sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); + // Ensure that the transaction is not in the list of active transactions anymore, + // and is in the list of timed out but not aborted transactions. assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); + // Use the timed out transaction and observe the `Error::ReadTransactionTimeout` assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); - let tx = env.begin_ro_txn().unwrap(); - let new_tx_ptr = tx.txn() as usize; + // Ensure that the transaction pointer is not reused when opening a new read-only + // transaction. + let new_tx = env.begin_ro_txn().unwrap(); + let new_tx_ptr = new_tx.txn() as usize; assert!(read_transactions.active.contains_key(&new_tx_ptr)); assert_ne!(tx_ptr, new_tx_ptr); + + // Drop the transaction and ensure that it's not in the list of timed out but not + // aborted transactions anymore. + drop(tx); + assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr)); } } From 130f8b79f789383f884f25de8fdd14f0b73da67a Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Feb 2024 18:50:34 +0000 Subject: [PATCH 20/20] Update crates/storage/libmdbx-rs/src/txn_manager.rs Co-authored-by: Emilia Hane --- crates/storage/libmdbx-rs/src/txn_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 02052405d5e9..82d53d9089ab 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -345,7 +345,7 @@ mod read_transactions { } { - // Create a read-only transaction and observe it's in the liist of active + // Create a read-only transaction and observe it's in the list of active // transactions. let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize;