Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Oct 17, 2024
1 parent 17d940c commit 8e05f75
Showing 1 changed file with 20 additions and 26 deletions.
46 changes: 20 additions & 26 deletions src/wal/src/local_storage_impl/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl Region {
let mut all_segments = HashMap::new();

// Scan the directory for existing WAL files
let mut max_segment_id: i32 = -1;
let mut max_segment_id: u64 = 0;
let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1;

// Segment file naming convention: {SEGMENT_NAME_PREFIX}{id}
Expand All @@ -671,27 +671,26 @@ impl Region {
.map_err(anyhow::Error::new)
.context(Internal)?;

let segment = Segment::new(filename.to_string(), segment_id, segment_size)?;
let segment = Segment::new(
entry.path().to_string_lossy().to_string(),
segment_id,
segment_size,
)?;
next_sequence_num = next_sequence_num.max(segment.max_seq + 1);
max_segment_id = max_segment_id.max(segment_id);
let segment = Arc::new(Mutex::new(segment));

if segment_id as i32 > max_segment_id {
max_segment_id = segment_id as i32;
}
all_segments.insert(segment_id, segment);
}

// If no existing segments, create a new one
if max_segment_id == -1 {
max_segment_id = 0;
let path = format!("{region_dir}/{SEGMENT_NAME_PREFIX}{max_segment_id}");
let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?;
let new_segment = Arc::new(Mutex::new(new_segment));
all_segments.insert(0, new_segment);
if all_segments.is_empty() {
all_segments.insert(
max_segment_id,
Self::create_new_segment(&region_dir, max_segment_id, segment_size)?,
);
}

let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone();

let latest_segment = all_segments.get(&max_segment_id).unwrap().clone();
let segment_manager = SegmentManager {
all_segments: Mutex::new(all_segments),
cache: Mutex::new(VecDeque::new()),
Expand All @@ -711,17 +710,9 @@ impl Region {
})
}

fn create_new_segment(&self, id: u64) -> Result<Arc<Mutex<Segment>>> {
// Create a new segment
let new_segment = Segment::new(
format!("{}/segment_{}.wal", self.region_dir, id),
id,
self.segment_size,
)?;
let new_segment = Arc::new(Mutex::new(new_segment));
self.segment_manager.add_segment(id, new_segment.clone())?;

Ok(new_segment)
fn create_new_segment(dir: &str, id: u64, size: usize) -> Result<Arc<Mutex<Segment>>> {
let new_segment = Segment::new(format!("{dir}/{SEGMENT_NAME_PREFIX}{id}"), id, size)?;
Ok(Arc::new(Mutex::new(new_segment)))
}

pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result<SequenceNumber> {
Expand Down Expand Up @@ -766,7 +757,10 @@ impl Region {
let new_segment_id = guard.id + 1;
drop(guard);

*current_segment = self.create_new_segment(new_segment_id)?;
*current_segment =
Self::create_new_segment(&self.region_dir, new_segment_id, self.segment_size)?;
self.segment_manager
.add_segment(new_segment_id, current_segment.clone())?;
}
}

Expand Down

0 comments on commit 8e05f75

Please sign in to comment.