Skip to content

Commit

Permalink
Code review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Bulatov authored and SomeoneToIgnore committed Jan 11, 2022
1 parent 7c4a653 commit 8ab4c8a
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 72 deletions.
5 changes: 5 additions & 0 deletions control_plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
cmd.env(RUST_LOG_KEY, rust_log_value)
Expand Down
5 changes: 0 additions & 5 deletions control_plane/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ impl SafekeeperNode {
cmd.arg("--no-sync");
}

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",
Expand Down
9 changes: 0 additions & 9 deletions control_plane/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ impl PageServerNode {
config_overrides: &[&str],
) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string();
Expand Down Expand Up @@ -180,11 +176,6 @@ impl PageServerNode {

fill_rust_env_vars(cmd.args(&args).arg("--daemonize"));

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",
Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/remote_storage/storage_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ fn storage_sync_loop<
max_sync_errors: NonZeroU32,
) -> anyhow::Result<()> {
let remote_assets = Arc::new((storage, RwLock::new(index)));
while !crate::tenant_mgr::shutdown_requested() {
loop {
let loop_step = runtime.block_on(async {
tokio::select! {
new_timeline_states = loop_step(
Expand All @@ -447,11 +447,13 @@ fn storage_sync_loop<
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
}
LoopStep::Shutdown => {}
LoopStep::Shutdown => {
debug!("Shutdown requested, stopping");
break;
}
}
}

debug!("Shutdown requested, stopping");
Ok(())
}

Expand Down
101 changes: 50 additions & 51 deletions test_runner/batch_others/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,64 +26,63 @@
# * queries the specific data, ensuring that it matches the one stored before
#
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder):
with tempfile.TemporaryDirectory() as tmp_root_dir:
zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.pageserver_remote_storage = LocalFsStorage(Path(tmp_root_dir))
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()

data_id = 1
data_secret = 'very secret secret'
data_id = 1
data_secret = 'very secret secret'

##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init()
pg = env.postgres.create_start()
##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init()
pg = env.postgres.create_start()

tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]

with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')

# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)

##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()

dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

##### Second start, restore the data and ensure it's the same
env.pageserver.start()
##### Second start, restore the data and ensure it's the same
env.pageserver.start()

log.info("waiting for timeline redownload")
client = env.pageserver.http_client()
attempts = 0
while True:
timeline_details = client.timeline_details(tenant_id, timeline_id)
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)
log.info("waiting for timeline redownload")
client = env.pageserver.http_client()
attempts = 0
while True:
timeline_details = client.timeline_details(tenant_id, timeline_id)
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)

pg = env.postgres.create_start()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
assert cur.fetchone() == (data_secret, )
pg = env.postgres.create_start()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
assert cur.fetchone() == (data_secret, )
13 changes: 9 additions & 4 deletions test_runner/fixtures/zenith_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ def init(self) -> ZenithEnv:
self.env = ZenithEnv(self)
return self.env

def enable_local_fs_remote_storage(self):
assert self.pageserver_remote_storage is None, "remote storage is enabled already"
self.pageserver_remote_storage = LocalFsStorage(
Path(self.repo_dir / 'local_fs_remote_storage'))

def __enter__(self):
return self

Expand Down Expand Up @@ -472,7 +477,7 @@ def __init__(self, config: ZenithEnvBuilder):
tmp.flush()

cmd = ['init', f'--config={tmp.name}']
_append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)
append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)

self.zenith_cli(cmd)

Expand Down Expand Up @@ -744,7 +749,7 @@ def start(self) -> 'ZenithPageserver':
assert self.running == False

start_args = ['pageserver', 'start']
_append_pageserver_param_overrides(start_args, self.remote_storage)
append_pageserver_param_overrides(start_args, self.remote_storage)

self.env.zenith_cli(start_args)
self.running = True
Expand Down Expand Up @@ -779,8 +784,8 @@ def http_client(self, auth_token: Optional[str] = None) -> ZenithPageserverHttpC
)


def _append_pageserver_param_overrides(params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage]):
def append_pageserver_param_overrides(params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage]):
if pageserver_remote_storage is not None:
if isinstance(pageserver_remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
Expand Down

0 comments on commit 8ab4c8a

Please sign in to comment.