Skip to content

Commit

Permalink
more fixes and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Aug 10, 2023
1 parent cfbb6ac commit 44df2bf
Showing 1 changed file with 76 additions and 4 deletions.
80 changes: 76 additions & 4 deletions rust/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub struct CommitConfig {

impl Default for CommitConfig {
fn default() -> Self {
Self { num_retries: 3 }
Self { num_retries: 5 }
}
}

Expand Down Expand Up @@ -405,7 +405,7 @@ pub(crate) async fn commit_transaction(
}
}

let mut target_version = version + 1;
let mut target_version = version;

// If any of them conflict with the transaction, return an error
for (version_offset, other_transaction) in other_transactions.iter().enumerate() {
Expand All @@ -422,6 +422,8 @@ pub(crate) async fn commit_transaction(
write_config,
)?;

debug_assert_eq!(manifest.version, target_version);

// Try to commit the manifest
let result = write_manifest_file(
object_store,
Expand Down Expand Up @@ -475,15 +477,15 @@ mod tests {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};

use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator};
use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::future::join_all;

use super::*;

use crate::arrow::FixedSizeListArrayExt;
use crate::dataset::transaction::Operation;
use crate::dataset::WriteParams;
use crate::dataset::{WriteMode, WriteParams};
use crate::index::vector::{MetricType, VectorIndexParams};
use crate::index::{DatasetIndexExt, IndexType};
use crate::io::object_store::ObjectStoreParams;
Expand Down Expand Up @@ -737,4 +739,74 @@ mod tests {
fields.sort();
assert_eq!(fields, vec![0, 1]);
}

#[tokio::test]
async fn test_concurrent_writes() {
for write_mode in [WriteMode::Append, WriteMode::Overwrite] {
// Create an empty table
let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));

let dataset = Dataset::write(
RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
test_uri,
None,
)
.await
.unwrap();

// Make some sample data
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();

// Write data concurrently in 5 tasks
let futures: Vec<_> = (0..5)
.map(|_| {
let batch = batch.clone();
let schema = schema.clone();
let uri = test_uri.to_string();
tokio::spawn(async move {
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
Dataset::write(
reader,
&uri,
Some(WriteParams {
mode: write_mode,
..Default::default()
}),
)
.await
})
})
.collect();
let results = join_all(futures).await;

// Assert all succeeded
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}

// Assert final fragments and versions expected
let dataset = dataset.checkout_version(6).await.unwrap();

match write_mode {
WriteMode::Append => {
assert_eq!(dataset.get_fragments().len(), 5);
}
WriteMode::Overwrite => {
assert_eq!(dataset.get_fragments().len(), 1);
}
_ => unreachable!(),
}
}
}
}

0 comments on commit 44df2bf

Please sign in to comment.