Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): use mdbx_txn_reset to time out transactions #6850

Merged
merged 22 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,19 @@ impl Database for DatabaseEnv {
type TXMut = tx::Tx<RW>;

fn tx(&self) -> Result<Self::TX, DatabaseError> {
Ok(Tx::new_with_metrics(
Tx::new_with_metrics(
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
))
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}

fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Ok(Tx::new_with_metrics(
Tx::new_with_metrics(
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
))
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
}

Expand Down
22 changes: 12 additions & 10 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ impl<K: TransactionKind> Tx<K> {
pub fn new_with_metrics(
inner: Transaction<K>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> Self {
let metrics_handler = env_metrics.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id(), env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
handler
});
Self::new_inner(inner, metrics_handler)
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
Ok(handler)
})
.transpose()?;
Ok(Self::new_inner(inner, metrics_handler))
}

#[inline]
Expand All @@ -76,8 +78,8 @@ impl<K: TransactionKind> Tx<K> {
}

/// Gets this transaction ID.
pub fn id(&self) -> u64 {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id)
pub fn id(&self) -> reth_libmdbx::Result<u64> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
}

/// Gets a table database handle if it exists, otherwise creates it.
Expand Down
26 changes: 14 additions & 12 deletions crates/storage/libmdbx-rs/benches/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,29 @@ fn bench_get_seq_raw(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);

let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
let _txn = env.begin_ro_txn().unwrap();
let txn = _txn.txn();
let txn = env.begin_ro_txn().unwrap();

let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut cursor: *mut MDBX_cursor = ptr::null_mut();

c.bench_function("bench_get_seq_raw", |b| {
b.iter(|| unsafe {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;
txn.txn_execute(|txn| {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;

while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}

black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
})
.unwrap();
})
});
}
Expand Down
5 changes: 3 additions & 2 deletions crates/storage/libmdbx-rs/benches/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {

c.bench_function("bench_get_rand_raw", |b| {
b.iter(|| unsafe {
txn.with_raw_tx_ptr(|txn| {
txn.txn_execute(|txn| {
let mut i: size_t = 0;
for key in &keys {
key_val.iov_len = key.len() as size_t;
Expand All @@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) {
i += key_val.iov_len;
}
black_box(i);
});
})
.unwrap();
})
});
}
Expand Down
79 changes: 43 additions & 36 deletions crates/storage/libmdbx-rs/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
error::{mdbx_result, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
Expand Down Expand Up @@ -30,26 +30,26 @@ where
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
mdbx_result_with_tx_kind::<K>(
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
txn.txn(),
txn.env().txn_manager(),
)?;
txn.txn_execute(|txn_ptr| {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
})??;
}
Ok(Self { txn, cursor })
}

fn new_at_position(other: &Self) -> Result<Self> {
unsafe {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
other.txn.txn_execute(|_| {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());

let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);

let s = Self { txn: other.txn.clone(), cursor };
let s = Self { txn: other.txn.clone(), cursor };

mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
mdbx_result(res)?;

Ok(s)
Ok(s)
})?
}
}

Expand Down Expand Up @@ -95,11 +95,12 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
self.txn.txn_execute(|txn| {
let v = mdbx_result_with_tx_kind::<K>(
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
txn,
self.txn.env().txn_manager(),
)?;
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
&mut data_val,
op,
))?;
assert_ne!(data_ptr, data_val.iov_base);
let key_out = {
// MDBX wrote in new key
Expand All @@ -111,7 +112,7 @@ where
};
let data_out = Value::decode_val::<K>(txn, data_val)?;
Ok((key_out, data_out, v))
})
})?
}
}

Expand Down Expand Up @@ -444,7 +445,7 @@ impl Cursor<RW> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})
})?
})?;

Ok(())
Expand All @@ -458,7 +459,7 @@ impl Cursor<RW> {
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
})?;

Ok(())
Expand All @@ -470,7 +471,7 @@ where
K: TransactionKind,
{
fn clone(&self) -> Self {
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
Self::new_at_position(self).unwrap()
}
}

Expand All @@ -488,7 +489,7 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
}
}

Expand Down Expand Up @@ -564,7 +565,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -583,7 +584,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Self::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -655,7 +660,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -674,7 +679,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Iter::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -752,17 +761,15 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);

cursor.txn.txn_execute(|_| {
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
}
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
Expand Down
10 changes: 3 additions & 7 deletions crates/storage/libmdbx-rs/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result_with_tx_kind, Result},
error::{mdbx_result, Result},
transaction::TransactionKind,
Environment, Transaction,
};
Expand Down Expand Up @@ -31,12 +31,8 @@ impl Database {
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
txn.txn_execute(|txn_ptr| {
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
txn_ptr,
txn.env().txn_manager(),
)
})?;
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
})??;
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
}

Expand Down
31 changes: 2 additions & 29 deletions crates/storage/libmdbx-rs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::{txn_manager::TxnManager, TransactionKind};
use libc::c_int;
use std::result;

Expand Down Expand Up @@ -193,9 +192,10 @@ impl Error {
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
Error::Access => ffi::MDBX_EACCESS,
Error::TooLarge => ffi::MDBX_TOO_LARGE,
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
Error::BadSignature => ffi::MDBX_EBADSIGN,
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Error::ReadTransactionAborted => -69000, // Custom non-MDBX error code
Error::Other(err_code) => *err_code,
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand All @@ -216,33 +216,6 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
}
}

#[cfg(feature = "read-tx-timeouts")]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
txn: *mut ffi::MDBX_txn,
txn_manager: &TxnManager,
) -> Result<bool> {
if K::IS_READ_ONLY &&
txn_manager.remove_aborted_read_transaction(txn).is_some() &&
err_code == ffi::MDBX_EBADSIGN
{
return Err(Error::ReadTransactionAborted)
}

mdbx_result(err_code)
}

#[cfg(not(feature = "read-tx-timeouts"))]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
_txn: *mut ffi::MDBX_txn,
_txn_manager: &TxnManager,
) -> Result<bool> {
mdbx_result(err_code)
}

#[macro_export]
macro_rules! mdbx_try_optional {
($expr:expr) => {{
Expand Down
Loading
Loading