Skip to content

Commit

Permalink
Test transactional behaviour (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn authored Oct 1, 2023
1 parent bbab928 commit 28fffe8
Showing 1 changed file with 143 additions and 18 deletions.
161 changes: 143 additions & 18 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ async fn init_database() -> Pool<Postgres> {
.await;
conn00.close().await;

let conn = connect(&format!(
"postgres://{username}:postgres@localhost:28815/pgmq_test"
))
.await;
let conn = connect(&database_name()).await;

// DROP EXTENSION
// requires pg_partman to already be installed in the instance
Expand Down Expand Up @@ -205,10 +202,8 @@ async fn test_lifecycle() {

// send a batch of 2 messages
let batch_queue = format!("test_batch_{test_num}");
let _ = sqlx::query(&format!("SELECT {PGMQ_SCHEMA}.create('{batch_queue}');"))
.execute(&conn)
.await
.expect("failed to create queue");
create_queue(&batch_queue.to_string(), &conn).await;

let msg_ids = sqlx::query(
&format!("select {PGMQ_SCHEMA}.send_batch('{batch_queue}', ARRAY['{{\"hello\": \"world_0\"}}'::jsonb, '{{\"hello\": \"world_1\"}}'::jsonb])")
)
Expand Down Expand Up @@ -304,10 +299,7 @@ async fn test_archive() {

let queue_name = format!("test_archive_{test_num}");

let _ = sqlx::query(&format!("SELECT {PGMQ_SCHEMA}.create('{queue_name}');"))
.execute(&conn)
.await
.expect("failed to create queue");
create_queue(&queue_name.to_string(), &conn).await;

// no messages in the queue
assert_eq!(get_queue_size(&queue_name, &conn).await, 0);
Expand Down Expand Up @@ -370,12 +362,7 @@ async fn test_read_read_with_poll() {
let queue_name = format!("test_read_{test_num}");

// Creating queue
sqlx::query(&format!(
"select * from {PGMQ_SCHEMA}.create('{queue_name}')"
))
.execute(&conn)
.await
.unwrap();
create_queue(&queue_name.to_string(), &conn).await;

// Sending 3 messages to the queue
let msg_id1 = send_sample_message(&queue_name, &conn).await;
Expand Down Expand Up @@ -518,6 +505,139 @@ async fn test_partitioned_delete() {
assert_eq!(deleted_batch[0].get::<i64, usize>(0), 1);
assert_eq!(deleted_batch[1].get::<i64, usize>(0), 2);
}
// Integration tests are ignored by default
#[ignore]
#[tokio::test]
async fn test_transaction_create() {
// Queue creation is reverted if transaction is rolled back
let _ = init_database().await;
let queue_name = "transaction_test_queue";
let conn1 = connect(&database_name()).await;
let mut tx1 = conn1.begin().await.unwrap();

sqlx::query(&format!("select from {PGMQ_SCHEMA}.create('{queue_name}')"))
.fetch_one(&mut tx1)
.await
.unwrap();

tx1.rollback().await.unwrap();

let table_exists = sqlx::query(&format!(
"select from pg_tables where schemaname = 'pgmq' and tablename = 'q_{queue_name}'"
))
.fetch_optional(&conn1)
.await
.unwrap();

assert!(table_exists.is_none());
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
async fn test_transaction_send() {
// This aims to test that a message won't be visible for other transactions
// until the transaction that published it commits
let _ = init_database().await;
let queue_name = "transaction_send_test_queue";
let conn1 = connect(&database_name()).await;
let conn2 = connect(&database_name()).await;

create_queue(&queue_name.to_string(), &conn1).await;

// Message can't be read for pending transaction
let mut tx = conn1.begin().await.unwrap();

sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.send('{queue_name}', '1')"
))
.fetch_one(&mut tx)
.await
.unwrap();

let read_msg = sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.read('{queue_name}', 0, 1)"
))
.fetch_optional(&conn2)
.await
.unwrap();

assert!(read_msg.is_none());

// After commiting the transaction, the message can be read
tx.commit().await.unwrap();

let read_msg2 = sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.read('{queue_name}', 0, 1)"
))
.fetch_optional(&conn2)
.await
.unwrap();

assert!(read_msg2.is_some());
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
async fn test_transaction_read() {
// A message read by one transaction can't be read by other concurrent transaction,
// even if VT expired, until the other transaction is committed or rolled back.
let _ = init_database().await;
let queue_name = "transaction_read_test_queue";
let conn1 = connect(&database_name()).await;
let conn2 = connect(&database_name()).await;

create_queue(&queue_name.to_string(), &conn1).await;

sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.send('{queue_name}', '1')"
))
.fetch_one(&conn1)
.await
.unwrap();

let mut tx1 = conn1.begin().await.unwrap();
let mut tx2 = conn2.begin().await.unwrap();

let read_msg1 = sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.read('{queue_name}', 1, 1)"
))
.fetch_optional(&mut tx1)
.await
.unwrap();

assert!(read_msg1.is_some());

tokio::time::sleep(std::time::Duration::from_millis(2000)).await;

let read_msg2 = sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.read('{queue_name}', 1, 1)"
))
.fetch_optional(&mut tx2)
.await
.unwrap();

assert!(read_msg2.is_none());

tx1.rollback().await.unwrap();

let read_msg3 = sqlx::query(&format!(
"select from {PGMQ_SCHEMA}.read('{queue_name}', 1, 1)"
))
.fetch_optional(&mut tx2)
.await
.unwrap();

assert!(read_msg3.is_some());
}

async fn create_queue(queue_name: &String, conn: &Pool<Postgres>) {
sqlx::query(&format!("select from {PGMQ_SCHEMA}.create('{queue_name}')"))
.fetch_one(conn)
.await
.unwrap();
}

async fn get_queue_size(queue_name: &String, conn: &Pool<Postgres>) -> i64 {
sqlx::query(&format!(
Expand Down Expand Up @@ -548,3 +668,8 @@ async fn send_sample_message(queue_name: &String, conn: &Pool<Postgres>) -> i64
.expect("failed to push message")
.get::<i64, usize>(0)
}

pub fn database_name() -> String {
let username = whoami::username();
format!("postgres://{username}:postgres@localhost:28815/pgmq_test")
}

0 comments on commit 28fffe8

Please sign in to comment.