-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support persist manifest efficientlly #1596
feat: support persist manifest efficientlly #1596
Conversation
3ea66e8
to
fe5f793
Compare
@@ -29,13 +39,16 @@ use crate::{ | |||
|
|||
pub const PREFIX_PATH: &str = "manifest"; | |||
pub const SNAPSHOT_FILENAME: &str = "snapshot"; | |||
pub const SST_PREFIX: &str = "sst"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those files should not be named sst, sst is only used in LSM, and represent sorted string table, here those file are delta files based on snapshot.
} | ||
|
||
let mut sst_files = Vec::with_capacity(paths.len()); | ||
for path in &paths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could fetch those files concurrently, one after one is too slow.
.await | ||
.context("failed to read sst file")?; | ||
let pb_sst = pb_types::SstFile::decode(bytes).context("failed to decode sst file")?; | ||
sst_files.push(SstFile::try_from(pb_sst)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to dedup files before merge, since there may exists old delta files, which are failed to delete in last merge.
.context("Failed to update manifest")?; | ||
|
||
// 2. Delete the old sst files | ||
for path in &paths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete concurrently
self.store | ||
.delete(path) | ||
.await | ||
.context("failed to delete sst file")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add path name to log
tokio::spawn(async move { | ||
ssts_merge.run().await; | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When start up, we also need to read delta files, if we only read snapshot, then part of sst won't be queried later?
Self { | ||
channel_size: 100, | ||
max_interval_seconds: 5, | ||
merge_threshold: 50, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10-20 is a reasonable choice.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> { | ||
let mut payload = self.payload.write().await; | ||
let mut tmp_ssts = payload.files.clone(); | ||
let new_sst_path = Path::from(format!("{}/{id}", self.sst_path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are too many pending delta files, we should block the write here.
1a811bc
to
b7c14a3
Compare
b7c14a3
to
c735904
Compare
@@ -29,11 +46,14 @@ use crate::{ | |||
|
|||
pub const PREFIX_PATH: &str = "manifest"; | |||
pub const SNAPSHOT_FILENAME: &str = "snapshot"; | |||
pub const DELTA_MANIFEST_PREFIX: &str = "delta_manifest"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub const DELTA_MANIFEST_PREFIX: &str = "delta_manifest"; | |
pub const DELTA_MANIFEST_PREFIX: &str = "delta"; |
|
||
// 2. Update cached payload | ||
let mut payload = self.payload.write().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap step2 in { .. }
, so we can release write lock quickly.
payload.files.push(new_sst); | ||
|
||
// 3. Schedule manifest merge | ||
self.manifest_merge.schedule_merge().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why call this again?
} | ||
match self.merge_manifest().await { | ||
Ok(_) => { | ||
self.sst_num.store(0, Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between L261 and L263, a new delta file may be created, so we can reset this to 0 directly.
merge_manifest
could return num of deltas merged, so we can subtract it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Rationale
close #1592
Detailed Changes
Test Plan
CI