Skip to content

Commit

Permalink
Fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 18, 2024
1 parent 84476cc commit 78a9f91
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
11 changes: 4 additions & 7 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use self::wal_segment::WalSegment;

pub struct Table {
name: String,
// `partitions` lock has to be always acquired before `buffer` and `frozen_buffer` lock
// To prevent deadlocks, `frozen_buffer` lock has to be always acquired before `partitions` before `buffer`
partitions: RwLock<HashMap<PartitionID, Arc<Partition>>>,
next_partition_id: AtomicU64,
next_partition_offset: AtomicUsize,
Expand Down Expand Up @@ -55,8 +55,8 @@ impl Table {
}

pub fn snapshot(&self) -> Vec<Arc<Partition>> {
let partitions = self.partitions.read().unwrap();
let frozen_buffer = self.frozen_buffer.lock().unwrap();
let partitions = self.partitions.read().unwrap();
let buffer = self.buffer.lock().unwrap();
let mut partitions: Vec<_> = partitions.values().cloned().collect();
let mut offset = partitions.iter().map(|p| p.len()).sum::<usize>();
Expand Down Expand Up @@ -88,17 +88,14 @@ impl Table {
partitions
}

pub fn snapshot_parts(
&self,
parts: &[PartitionID],
) -> Vec<Arc<Partition>> {
pub fn snapshot_parts(&self, parts: &[PartitionID]) -> Vec<Arc<Partition>> {
let partitions = self.partitions.read().unwrap();
parts.iter().map(|id| partitions[id].clone()).collect()
}

pub fn freeze_buffer(&self) {
let mut buffer = self.buffer.lock().unwrap();
let mut frozen_buffer = self.frozen_buffer.lock().unwrap();
let mut buffer = self.buffer.lock().unwrap();
assert!(frozen_buffer.len() == 0, "Frozen buffer is not empty");
std::mem::swap(&mut *buffer, &mut *frozen_buffer);
}
Expand Down
3 changes: 2 additions & 1 deletion src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl InnerLocustDB {
(time_write_partitions, time_meta_update) = s.persist_partitions(new_partitions);
}

// Write new segments from compactions and apply compaction in-memory
let start_time_compaction = Instant::now();
let mut partitions_to_delete = Vec::new();
for (table, id, (range, parts)) in &compactions {
Expand Down Expand Up @@ -409,7 +410,7 @@ impl InnerLocustDB {
columns.push(column_builder.finalize(column));
}
let (metadata, subpartitions) = subpartition(&self.opts, columns.clone());
// write subpartitions to disk, update metastore unlinking old partitions, delete old partitions
// write new subpartitions to disk and update in-memory metastore
if let Some(storage) = self.storage.as_ref() {
let to_delete = storage.prepare_compact(
table,
Expand Down

0 comments on commit 78a9f91

Please sign in to comment.