Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pakhomov/disable old storage #3708

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions rs/config/src/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::path::PathBuf;

#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct LsmtConfig {
/// Whether LSMT is enabled or not.
pub lsmt_status: FlagStatus,
/// Number of pages per shard in sharded overlays; u64::MAX if unlimited.
pub shard_num_pages: u64,
}
Expand Down Expand Up @@ -47,7 +45,6 @@ fn file_backed_memory_allocator_default() -> FlagStatus {

pub fn lsmt_config_default() -> LsmtConfig {
LsmtConfig {
lsmt_status: FlagStatus::Enabled,
// 40GiB
// DO NOT CHANGE after LSMT is enabled, as it would crash the new replica trying to merge
// old data.
Expand Down
137 changes: 15 additions & 122 deletions rs/replicated_state/src/page_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ pub mod test_utils;

use bit_vec::BitVec;
pub use checkpoint::{CheckpointSerialization, MappingSerialization};
use ic_config::flag_status::FlagStatus;
use ic_config::state_manager::LsmtConfig;
use ic_metrics::buckets::{decimal_buckets, linear_buckets};
use ic_metrics::MetricsRegistry;
use ic_sys::{fs::write_all_vectored, PageBytes};
use ic_sys::PageBytes;
pub use ic_sys::{PageIndex, PAGE_SIZE};
use ic_utils::deterministic_operations::deterministic_copy_from_slice;
pub use page_allocator::{
Expand All @@ -32,15 +31,10 @@ use page_allocator::Page;
use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::ops::Range;
use std::os::unix::io::RawFd;
use std::path::Path;
use std::sync::Arc;

// When persisting data we expand dirty pages to an aligned bucket of given size.
const WRITE_BUCKET_PAGES: u64 = 16;

const LABEL_OP: &str = "op";
const LABEL_TYPE: &str = "type";
const LABEL_OP_FLUSH: &str = "flush";
Expand Down Expand Up @@ -128,39 +122,6 @@ impl StorageMetrics {
}
}

struct WriteBuffer<'a> {
content: Vec<&'a [u8]>,
start_index: PageIndex,
}

impl WriteBuffer<'_> {
fn apply_to_file(&mut self, file: &mut File, path: &Path) -> Result<(), PersistenceError> {
use std::io::{Seek, SeekFrom};

let offset = self.start_index.get() * PAGE_SIZE as u64;
file.seek(SeekFrom::Start(offset))
.map_err(|err| PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!("Failed to seek to {}", offset),
internal_error: err.to_string(),
})?;

write_all_vectored(file, &self.content).map_err(|err| {
PersistenceError::FileSystemError {
path: path.display().to_string(),
context: format!(
"Failed to copy page range #{}..{}",
self.start_index,
self.start_index.get() + self.content.len() as u64
),
internal_error: err.to_string(),
}
})?;

Ok(())
}
}

/// `PageDelta` represents a changeset of the module heap.
///
/// NOTE: We use a persistent map to make snapshotting of a PageMap a cheap
Expand Down Expand Up @@ -590,16 +551,13 @@ impl PageMap {
lsmt_config: &LsmtConfig,
metrics: &StorageMetrics,
) -> Result<(), PersistenceError> {
match lsmt_config.lsmt_status {
FlagStatus::Disabled => self.persist_to_file(&self.page_delta, &storage_layout.base()),
FlagStatus::Enabled => self.persist_to_overlay(
&self.page_delta,
storage_layout,
height,
lsmt_config,
metrics,
),
}
self.persist_to_overlay(
&self.page_delta,
storage_layout,
height,
lsmt_config,
metrics,
)
}

/// Persists the unflushed delta contained in this page map to the specified
Expand All @@ -611,18 +569,13 @@ impl PageMap {
lsmt_config: &LsmtConfig,
metrics: &StorageMetrics,
) -> Result<(), PersistenceError> {
match lsmt_config.lsmt_status {
FlagStatus::Disabled => {
self.persist_to_file(&self.unflushed_delta, &storage_layout.base())
}
FlagStatus::Enabled => self.persist_to_overlay(
&self.unflushed_delta,
storage_layout,
height,
lsmt_config,
metrics,
),
}
self.persist_to_overlay(
&self.unflushed_delta,
storage_layout,
height,
lsmt_config,
metrics,
)
}

fn persist_to_overlay(
Expand Down Expand Up @@ -903,66 +856,6 @@ impl PageMap {
self.unflushed_delta.update(delta)
}

/// Persists the given delta to the specified destination.
fn persist_to_file(&self, page_delta: &PageDelta, dst: &Path) -> Result<(), PersistenceError> {
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(dst)
.map_err(|err| PersistenceError::FileSystemError {
path: dst.display().to_string(),
context: "Failed to open file".to_string(),
internal_error: err.to_string(),
})?;
self.apply_delta_to_file(&mut file, page_delta, dst)?;
Ok(())
}

/// Applies the given delta to the specified file.
/// Precondition: `file` is seekable and writeable.
fn apply_delta_to_file(
&self,
file: &mut File,
page_delta: &PageDelta,
path: &Path,
) -> Result<(), PersistenceError> {
// Empty delta
if page_delta.max_page_index().is_none() {
return Ok(());
}

let mut last_applied_index: Option<PageIndex> = None;
let num_host_pages = self.num_host_pages() as u64;
for (index, _) in page_delta.iter() {
debug_assert!(self.page_delta.0.get(index).is_some());
assert!(*index < num_host_pages.into());

if last_applied_index.is_some() && last_applied_index.unwrap() >= *index {
continue;
}

let bucket_start_index =
PageIndex::from((index.get() / WRITE_BUCKET_PAGES) * WRITE_BUCKET_PAGES);
let mut buffer = WriteBuffer {
content: vec![],
start_index: bucket_start_index,
};
for i in 0..WRITE_BUCKET_PAGES {
let index_to_apply = PageIndex::from(bucket_start_index.get() + i);
// We don't expand past the end of file to make bucketing transparent.
if index_to_apply.get() < num_host_pages {
let content = self.get_page(index_to_apply);
buffer.content.push(content);
last_applied_index = Some(index_to_apply);
}
}
buffer.apply_to_file(file, path)?;
}

Ok(())
}

/// Returns the number of delta pages included in this PageMap.
pub fn num_delta_pages(&self) -> usize {
self.page_delta.len()
Expand Down
29 changes: 5 additions & 24 deletions rs/replicated_state/src/page_map/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::page_map::{
};
use assert_matches::assert_matches;
use bit_vec::BitVec;
use ic_config::flag_status::FlagStatus;
use ic_config::state_manager::LsmtConfig;
use ic_metrics::MetricsRegistry;
use ic_sys::{PageIndex, PAGE_SIZE};
Expand Down Expand Up @@ -476,7 +475,6 @@ fn write_overlay(
&storage_layout,
height,
&LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: u64::MAX,
},
metrics,
Expand All @@ -485,16 +483,12 @@ fn write_overlay(

fn lsmt_config_unsharded() -> LsmtConfig {
LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: u64::MAX,
}
}

fn lsmt_config_sharded() -> LsmtConfig {
LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 3,
}
LsmtConfig { shard_num_pages: 3 }
}

/// This function applies `instructions` to a new `Storage` in a temporary directory.
Expand Down Expand Up @@ -913,10 +907,7 @@ fn wrong_shard_pages_is_an_error() {
WriteOverlay((0..9).collect::<Vec<_>>()),
WriteOverlay((0..9).collect::<Vec<_>>()),
],
&LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 4,
},
&LsmtConfig { shard_num_pages: 4 },
&tempdir,
);
let merge_candidates = MergeCandidate::new(
Expand All @@ -927,10 +918,7 @@ fn wrong_shard_pages_is_an_error() {
},
Height::from(0),
9, /* num_pages */
&LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 3,
},
&LsmtConfig { shard_num_pages: 3 },
&StorageMetrics::new(&MetricsRegistry::new()),
)
.unwrap();
Expand Down Expand Up @@ -1075,7 +1063,6 @@ fn test_make_none_merge_candidate() {
fn test_make_merge_candidates_to_overlay() {
let tempdir = tempdir().unwrap();
let lsmt_config = LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 15,
};

Expand Down Expand Up @@ -1349,10 +1336,7 @@ fn can_write_shards() {

write_overlays_and_verify_with_tempdir(
instructions,
&LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 1,
},
&LsmtConfig { shard_num_pages: 1 },
&tempdir,
);
let files = storage_files(tempdir.path());
Expand All @@ -1374,10 +1358,7 @@ fn overlapping_shards_is_an_error() {

write_overlays_and_verify_with_tempdir(
instructions,
&LsmtConfig {
lsmt_status: FlagStatus::Enabled,
shard_num_pages: 1,
},
&LsmtConfig { shard_num_pages: 1 },
&tempdir,
);
let files = storage_files(tempdir.path());
Expand Down
Loading
Loading