Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Fix / extend transaction isolation levels #2350

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
pub use sqlx::{types::BigDecimal, Error as SqlxError};
use zksync_db_connection::connection::DbMarker;
pub use zksync_db_connection::{
connection::Connection,
connection::{Connection, IsolationLevel},
connection_pool::{ConnectionPool, ConnectionPoolBuilder},
error::{DalError, DalResult},
};
Expand Down
1 change: 1 addition & 0 deletions core/lib/db_connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ tracing.workspace = true

[dev-dependencies]
assert_matches.workspace = true
test-casing.workspace = true
92 changes: 86 additions & 6 deletions core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
Ok(TransactionBuilder {
connection: self,
is_readonly: false,
isolation_level: None,
})
}

Expand Down Expand Up @@ -280,11 +281,26 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
}
}

/// Transaction isolation level.
///
/// See [Postgres docs](https://www.postgresql.org/docs/14/transaction-iso.html) for details on isolation level semantics.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum IsolationLevel {
/// "Read committed" isolation level.
ReadCommitted,
/// "Repeatable read" isolation level (aka "snapshot isolation").
RepeatableRead,
/// Serializable isolation level.
Serializable,
}

/// Builder of transactions allowing to configure transaction characteristics (for now, just its readonly status).
#[derive(Debug)]
pub struct TransactionBuilder<'a, 'c, DB: DbMarker> {
connection: &'a mut Connection<'c, DB>,
is_readonly: bool,
isolation_level: Option<IsolationLevel>,
}

impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {
Expand All @@ -294,12 +310,40 @@ impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {
self
}

/// Sets the isolation level of this transaction. If this method is not called, the isolation level will be
/// "read committed" (the default Postgres isolation level) for read-write transactions, and "repeatable read"
/// for readonly transactions. Beware that setting high isolation level for read-write transactions may lead
/// to performance degradation and/or isolation-related errors.
pub fn set_isolation(mut self, level: IsolationLevel) -> Self {
self.isolation_level = Some(level);
self
}

/// Builds the transaction with the provided characteristics.
pub async fn build(self) -> DalResult<Connection<'a, DB>> {
let mut transaction = self.connection.start_transaction().await?;

let level = self.isolation_level.unwrap_or(if self.is_readonly {
IsolationLevel::RepeatableRead
} else {
IsolationLevel::ReadCommitted
});
let level = match level {
IsolationLevel::ReadCommitted => "READ COMMITTED",
IsolationLevel::RepeatableRead => "REPEATABLE READ",
IsolationLevel::Serializable => "SERIALIZABLE",
};
let mut set_transaction_args = format!(" ISOLATION LEVEL {level}");

if self.is_readonly {
sqlx::query("SET TRANSACTION READ ONLY")
set_transaction_args += " READ ONLY";
}

if !set_transaction_args.is_empty() {
sqlx::query(&format!("SET TRANSACTION{set_transaction_args}"))
.instrument("set_transaction_characteristics")
.with_arg("isolation_level", &self.isolation_level)
.with_arg("readonly", &self.is_readonly)
.execute(&mut transaction)
.await?;
}
Expand All @@ -309,6 +353,8 @@ impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> {

#[cfg(test)]
mod tests {
use test_casing::test_casing;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -344,17 +390,51 @@ mod tests {
}
}

const ISOLATION_LEVELS: [Option<IsolationLevel>; 4] = [
None,
Some(IsolationLevel::ReadCommitted),
Some(IsolationLevel::RepeatableRead),
Some(IsolationLevel::Serializable),
];

#[test_casing(4, ISOLATION_LEVELS)]
#[tokio::test]
async fn creating_readonly_transaction() {
async fn setting_isolation_level_for_transaction(level: Option<IsolationLevel>) {
let pool = ConnectionPool::<InternalMarker>::constrained_test_pool(1).await;
let mut connection = pool.connection().await.unwrap();
let mut readonly_transaction = connection
.transaction_builder()
let mut transaction_builder = connection.transaction_builder().unwrap();
if let Some(level) = level {
transaction_builder = transaction_builder.set_isolation(level);
}

let mut transaction = transaction_builder.build().await.unwrap();
assert!(transaction.in_transaction());

sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks")
.instrument("test")
.fetch_optional(&mut transaction)
.await
.unwrap()
.set_readonly()
.build()
.expect("no row returned");
// Check that it's possible to execute write statements in the transaction.
sqlx::query("DELETE FROM miniblocks")
.instrument("test")
.execute(&mut transaction)
.await
.unwrap();
}

#[test_casing(4, ISOLATION_LEVELS)]
#[tokio::test]
async fn creating_readonly_transaction(level: Option<IsolationLevel>) {
let pool = ConnectionPool::<InternalMarker>::constrained_test_pool(1).await;
let mut connection = pool.connection().await.unwrap();
let mut transaction_builder = connection.transaction_builder().unwrap().set_readonly();
if let Some(level) = level {
transaction_builder = transaction_builder.set_isolation(level);
}

let mut readonly_transaction = transaction_builder.build().await.unwrap();
assert!(readonly_transaction.in_transaction());

sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks")
Expand Down
Loading