Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic remote storage test #1079

Merged
merged 7 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions control_plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
use std::process::Command;

pub mod compute;
pub mod local_env;
Expand All @@ -31,3 +32,19 @@ pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
}
Ok(pid)
}

fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

const RUST_LOG_KEY: &str = "RUST_LOG";
if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does RUST_LOG_KEY do?

Would it make sense to pass through all env variables with RUST_* prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does RUST_LOG_KEY do?

That's just a variable to hold the RUST_LOG value to use it twice on the next lines. const forces it to BE_NAMED_THIS_WAY , so might be a bit confusing. I can define it as let rust_log_key = ..., would that reduce the confusion?

Would it make sense to pass through all env variables with RUST_* prefix?

I'm not aware of any others that start with such prefix, actually.
Seems that the RUST_LOG one emerged from env_logger crate and still isn't a standard really, since has to be supported in every logging backend separately. For instance, the slog one we've used before tracing did not support such variable.

Lastly, the only documentation about any rust-related env vars I was able to find is about cargo:
https://doc.rust-lang.org/cargo/reference/environment-variables.html

So, I'd rather keep the RUST_LOG one only for now.

cmd.env(RUST_LOG_KEY, rust_log_value)
} else {
cmd
}
}
21 changes: 8 additions & 13 deletions control_plane/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;

use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::read_pidfile;
use crate::storage::PageServerNode;
use crate::{fill_rust_env_vars, read_pidfile};
use zenith_utils::connstring::connection_address;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -118,22 +118,17 @@ impl SafekeeperNode {
let listen_http = format!("localhost:{}", self.conf.http_port);

let mut cmd = Command::new(self.env.safekeeper_bin()?);
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");
fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.arg("--daemonize"),
);
if !self.conf.sync {
cmd.arg("--no-sync");
}

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",
Expand Down
79 changes: 41 additions & 38 deletions control_plane/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;

use crate::local_env::LocalEnv;
use crate::read_pidfile;
use crate::{fill_rust_env_vars, read_pidfile};
use pageserver::branches::BranchInfo;
use pageserver::tenant_mgr::TenantInfo;
use zenith_utils::connstring::connection_address;
Expand Down Expand Up @@ -96,46 +96,49 @@ impl PageServerNode {
.unwrap()
}

pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
pub fn init(
&self,
create_tenant: Option<&str>,
config_overrides: &[&str],
) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}

// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let mut args = vec![
"--init".to_string(),
"-D".to_string(),
self.env.base_data_dir.display().to_string(),
"-c".to_string(),
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()),
"-c".to_string(),
format!("auth_type='{}'", self.env.pageserver.auth_type),
"-c".to_string(),
format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
),
"-c".to_string(),
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr),
];
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let mut args = Vec::with_capacity(20);

args.push("--init");
args.extend(["-D", &base_data_dir_param]);
args.extend(["-c", &pg_distrib_dir_param]);
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);

for config_override in config_overrides {
args.extend(["-c", config_override]);
}

if self.env.pageserver.auth_type != AuthType::Trust {
args.extend([
"-c".to_string(),
"auth_validation_public_key_path='auth_public_key.pem'".to_string(),
"-c",
"auth_validation_public_key_path='auth_public_key.pem'",
]);
}

if let Some(tenantid) = create_tenant {
args.extend(["--create-tenant".to_string(), tenantid.to_string()])
args.extend(["--create-tenant", tenantid])
}

let status = cmd
.args(args)
.env_clear()
.env("RUST_BACKTRACE", "1")
let status = fill_rust_env_vars(cmd.args(args))
.status()
.expect("pageserver init failed");

Expand All @@ -154,7 +157,7 @@ impl PageServerNode {
self.repo_path().join("pageserver.pid")
}

pub fn start(&self) -> anyhow::Result<()> {
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
print!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
Expand All @@ -163,16 +166,16 @@ impl PageServerNode {
io::stdout().flush().unwrap();

let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-D", self.repo_path().to_str().unwrap()])
.arg("--daemonize")
.env_clear()
.env("RUST_BACKTRACE", "1");

let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);

let repo_path = self.repo_path();
let mut args = vec!["-D", repo_path.to_str().unwrap()];

for config_override in config_overrides {
args.extend(["-c", config_override]);
}

fill_rust_env_vars(cmd.args(&args).arg("--daemonize"));

if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",
Expand Down
10 changes: 6 additions & 4 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ fn main() -> Result<()> {
)
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::with_name("config-option")
Arg::with_name("config-override")
.short("c")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.help("Additional configuration options or overrides of the ones from the toml config file.
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
)
.get_matches();
Expand Down Expand Up @@ -105,7 +105,7 @@ fn main() -> Result<()> {
};

// Process any extra options given with -c
if let Some(values) = arg_matches.values_of("config-option") {
if let Some(values) = arg_matches.values_of("config-override") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
format!(
Expand Down Expand Up @@ -195,9 +195,10 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
}

let signals = signals::install_shutdown_handlers()?;
let (async_shutdown_tx, async_shutdown_rx) = tokio::sync::watch::channel(());
let mut threads = Vec::new();

let sync_startup = remote_storage::start_local_timeline_sync(conf)
let sync_startup = remote_storage::start_local_timeline_sync(conf, async_shutdown_rx)
.context("Failed to set up local files sync with external storage")?;

if let Some(handle) = sync_startup.sync_loop_handle {
Expand Down Expand Up @@ -255,6 +256,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
signal.name()
);

async_shutdown_tx.send(())?;
postgres_backend::set_pgbackend_shutdown_requested();
tenant_mgr::shutdown_all_tenants()?;
endpoint::shutdown();
Expand Down
18 changes: 10 additions & 8 deletions pageserver/src/branches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ impl BranchInfo {
repo: &Arc<dyn Repository>,
include_non_incremental_logical_size: bool,
) -> Result<Self> {
let name = path
.as_ref()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
let path = path.as_ref();
let name = path.file_name().unwrap().to_string_lossy().to_string();
let timeline_id = std::fs::read_to_string(path)
.with_context(|| {
format!(
"Failed to read branch file contents at path '{}'",
path.display()
)
})?
.parse::<ZTimelineId>()?;

let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,
Expand Down
48 changes: 30 additions & 18 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use anyhow::{Context, Result};
use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
Expand Down Expand Up @@ -190,18 +190,27 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
}

#[derive(Debug, Serialize)]
struct TimelineInfo {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
#[serde(tag = "type")]
enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
},
}

async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
Expand All @@ -215,9 +224,12 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)?.local_timeline() {
None => bail!("Timeline with id {} is not present locally", timeline_id),
Some(timeline) => Ok::<_, anyhow::Error>(TimelineInfo {
Ok::<_, anyhow::Error>(match repo.get_timeline(timeline_id)?.local_timeline() {
None => TimelineInfo::Remote {
timeline_id,
tenant_id,
},
Some(timeline) => TimelineInfo::Local {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
Expand All @@ -226,8 +238,8 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
prev_record_lsn: timeline.get_prev_record_lsn(),
start_lsn: timeline.get_start_lsn(),
timeline_state: repo.get_timeline_state(timeline_id),
}),
}
},
})
})
.await
.map_err(ApiError::from_err)??;
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/remote_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ use std::{
};

use anyhow::{bail, Context};
use tokio::io;
use tokio::{io, sync};
use tracing::{error, info};
use zenith_utils::zid::{ZTenantId, ZTimelineId};

Expand Down Expand Up @@ -135,20 +135,23 @@ pub struct SyncStartupData {
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
pub fn start_local_timeline_sync(
config: &'static PageServerConf,
shutdown_hook: sync::watch::Receiver<()>,
) -> anyhow::Result<SyncStartupData> {
let local_timeline_files = local_tenant_timeline_files(config)
.context("Failed to collect local tenant timeline files")?;

match &config.remote_storage_config {
Some(storage_config) => match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
shutdown_hook,
config,
local_timeline_files,
LocalFs::new(root.clone(), &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_sync_errors,
),
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
shutdown_hook,
config,
local_timeline_files,
S3::new(s3_config, &config.workdir)?,
Expand Down
Loading