Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add possibility to configure a subfolder inside an S3 bucket #2647

Merged
merged 10 commits into from
Aug 28, 2023
21 changes: 15 additions & 6 deletions rust/hyperlane-base/src/settings/checkpoint_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum CheckpointSyncerConf {
S3 {
/// Bucket name
bucket: String,
/// Folder name inside bucket - defaults to the root of the bucket
folder: Option<String>,
/// S3 Region
region: Region,
},
Expand All @@ -36,13 +38,15 @@ impl FromStr for CheckpointSyncerConf {

match prefix {
"s3" => {
let [bucket, region]: [&str; 2] = suffix
.split('/')
.collect::<Vec<_>>()
.try_into()
.map_err(|_| eyre!("Error parsing storage location; could not split bucket and region ({suffix})"))?;
let url_components = suffix.split('/').collect::<Vec<&str>>();
let (bucket, region, folder): (&str, &str, Option<String>) = match url_components.len() {
2 => Ok((url_components[0], url_components[1], None)),
3 .. => Ok((url_components[0], url_components[1], Some(url_components[2..].join("/")))),
_ => Err(eyre!("Error parsing storage location; could not split bucket, region and folder ({suffix})"))
}?;
Ok(CheckpointSyncerConf::S3 {
bucket: bucket.into(),
folder,
region: region
.parse()
.context("Invalid region when parsing storage location")?,
Expand All @@ -66,8 +70,13 @@ impl CheckpointSyncerConf {
CheckpointSyncerConf::LocalStorage { path } => {
Box::new(LocalStorage::new(path.clone(), latest_index_gauge)?)
}
CheckpointSyncerConf::S3 { bucket, region } => Box::new(S3Storage::new(
CheckpointSyncerConf::S3 {
bucket,
folder,
region,
} => Box::new(S3Storage::new(
bucket.clone(),
folder.clone(),
region.clone(),
latest_index_gauge,
)),
Expand Down
9 changes: 8 additions & 1 deletion rust/hyperlane-base/src/settings/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub enum RawCheckpointSyncerConf {
S3 {
/// Bucket name
bucket: Option<String>,
/// Folder name inside bucket - defaults to the root of the bucket
folder: Option<String>,
/// S3 Region
region: Option<String>,
},
Expand Down Expand Up @@ -642,10 +644,15 @@ impl FromRawConf<RawCheckpointSyncerConf> for CheckpointSyncerConf {
}
Ok(Self::LocalStorage { path })
}
RawCheckpointSyncerConf::S3 { bucket, region } => Ok(Self::S3 {
RawCheckpointSyncerConf::S3 {
bucket,
folder,
region,
} => Ok(Self::S3 {
bucket: bucket
.ok_or_else(|| eyre!("Missing `bucket` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "bucket")?,
folder,
region: region
.ok_or_else(|| eyre!("Missing `region` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "region")?
Expand Down
21 changes: 18 additions & 3 deletions rust/hyperlane-base/src/types/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const S3_REQUEST_TIMEOUT_SECONDS: u64 = 30;
pub struct S3Storage {
/// The name of the bucket.
bucket: String,
/// A specific folder inside the above repo - set to empty string to use the root of the bucket
folder: Option<String>,
/// The region of the bucket.
region: Region,
/// A client with AWS credentials.
Expand All @@ -44,6 +46,7 @@ impl fmt::Debug for S3Storage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
.field("bucket", &self.bucket)
.field("folder", &self.folder)
.field("region", &self.region)
.finish()
}
Expand All @@ -52,7 +55,7 @@ impl fmt::Debug for S3Storage {
impl S3Storage {
async fn write_to_bucket(&self, key: String, body: &str) -> Result<()> {
let req = PutObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
body: Some(Vec::from(body).into()),
content_type: Some("application/json".to_owned()),
Expand All @@ -69,7 +72,7 @@ impl S3Storage {
/// Uses an anonymous client. This should only be used for publicly accessible buckets.
async fn anonymously_read_from_bucket(&self, key: String) -> Result<Option<Vec<u8>>> {
let req = GetObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
..Default::default()
};
Expand Down Expand Up @@ -120,6 +123,13 @@ impl S3Storage {
})
}

fn get_composite_key(&self, key: String) -> String {
match self.folder.as_deref() {
None | Some("") => key,
Some(folder_str) => format!("{}/{}", folder_str, key),
}
}

fn legacy_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}.json")
}
Expand Down Expand Up @@ -209,6 +219,11 @@ impl CheckpointSyncer for S3Storage {
}

fn announcement_location(&self) -> String {
format!("s3://{}/{}", self.bucket, self.region.name())
match self.folder.as_deref() {
None | Some("") => format!("s3://{}/{}", self.bucket, self.region.name()),
Some(folder_str) => {
format!("s3://{}/{}/{}", self.bucket, self.region.name(), folder_str)
}
}
}
}