Skip to content

Commit

Permalink
feat: Add possibility to configure a subfolder inside an S3 bucket (#…
Browse files Browse the repository at this point in the history
…2647)

### Description

This change adds the possibility to configure a subfolder inside an S3
bucket, to avoid having to spin up an entire bucket for hyperlane.

### Drive-by changes

Nothing

### Related issues

None, chatted about it on Slack with @nambrot 

### Backward compatibility

Yes

I made the field optional in the `RawCheckpointSyncerConf` so that
current config are still valid and will continue to read/write at the
root of the bucket. That being said, I'm new to this code so I hope I
got it right

### Testing

None

---------

Co-authored-by: Nam Chu Hoai <[email protected]>
  • Loading branch information
gbouv and nambrot authored Aug 28, 2023
1 parent c402185 commit 478826b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
21 changes: 15 additions & 6 deletions rust/hyperlane-base/src/settings/checkpoint_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum CheckpointSyncerConf {
S3 {
/// Bucket name
bucket: String,
/// Folder name inside bucket - defaults to the root of the bucket
folder: Option<String>,
/// S3 Region
region: Region,
},
Expand All @@ -36,13 +38,15 @@ impl FromStr for CheckpointSyncerConf {

match prefix {
"s3" => {
let [bucket, region]: [&str; 2] = suffix
.split('/')
.collect::<Vec<_>>()
.try_into()
.map_err(|_| eyre!("Error parsing storage location; could not split bucket and region ({suffix})"))?;
let url_components = suffix.split('/').collect::<Vec<&str>>();
let (bucket, region, folder): (&str, &str, Option<String>) = match url_components.len() {
2 => Ok((url_components[0], url_components[1], None)),
3 .. => Ok((url_components[0], url_components[1], Some(url_components[2..].join("/")))),
_ => Err(eyre!("Error parsing storage location; could not split bucket, region and folder ({suffix})"))
}?;
Ok(CheckpointSyncerConf::S3 {
bucket: bucket.into(),
folder,
region: region
.parse()
.context("Invalid region when parsing storage location")?,
Expand All @@ -66,8 +70,13 @@ impl CheckpointSyncerConf {
CheckpointSyncerConf::LocalStorage { path } => {
Box::new(LocalStorage::new(path.clone(), latest_index_gauge)?)
}
CheckpointSyncerConf::S3 { bucket, region } => Box::new(S3Storage::new(
CheckpointSyncerConf::S3 {
bucket,
folder,
region,
} => Box::new(S3Storage::new(
bucket.clone(),
folder.clone(),
region.clone(),
latest_index_gauge,
)),
Expand Down
9 changes: 8 additions & 1 deletion rust/hyperlane-base/src/settings/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub enum RawCheckpointSyncerConf {
S3 {
/// Bucket name
bucket: Option<String>,
/// Folder name inside bucket - defaults to the root of the bucket
folder: Option<String>,
/// S3 Region
region: Option<String>,
},
Expand Down Expand Up @@ -642,10 +644,15 @@ impl FromRawConf<RawCheckpointSyncerConf> for CheckpointSyncerConf {
}
Ok(Self::LocalStorage { path })
}
RawCheckpointSyncerConf::S3 { bucket, region } => Ok(Self::S3 {
RawCheckpointSyncerConf::S3 {
bucket,
folder,
region,
} => Ok(Self::S3 {
bucket: bucket
.ok_or_else(|| eyre!("Missing `bucket` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "bucket")?,
folder,
region: region
.ok_or_else(|| eyre!("Missing `region` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "region")?
Expand Down
21 changes: 18 additions & 3 deletions rust/hyperlane-base/src/types/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const S3_REQUEST_TIMEOUT_SECONDS: u64 = 30;
pub struct S3Storage {
/// The name of the bucket.
bucket: String,
/// A specific folder inside the above repo - set to empty string to use the root of the bucket
folder: Option<String>,
/// The region of the bucket.
region: Region,
/// A client with AWS credentials.
Expand All @@ -44,6 +46,7 @@ impl fmt::Debug for S3Storage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
.field("bucket", &self.bucket)
.field("folder", &self.folder)
.field("region", &self.region)
.finish()
}
Expand All @@ -52,7 +55,7 @@ impl fmt::Debug for S3Storage {
impl S3Storage {
async fn write_to_bucket(&self, key: String, body: &str) -> Result<()> {
let req = PutObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
body: Some(Vec::from(body).into()),
content_type: Some("application/json".to_owned()),
Expand All @@ -69,7 +72,7 @@ impl S3Storage {
/// Uses an anonymous client. This should only be used for publicly accessible buckets.
async fn anonymously_read_from_bucket(&self, key: String) -> Result<Option<Vec<u8>>> {
let req = GetObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
..Default::default()
};
Expand Down Expand Up @@ -120,6 +123,13 @@ impl S3Storage {
})
}

fn get_composite_key(&self, key: String) -> String {
match self.folder.as_deref() {
None | Some("") => key,
Some(folder_str) => format!("{}/{}", folder_str, key),
}
}

fn legacy_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}.json")
}
Expand Down Expand Up @@ -209,6 +219,11 @@ impl CheckpointSyncer for S3Storage {
}

fn announcement_location(&self) -> String {
format!("s3://{}/{}", self.bucket, self.region.name())
match self.folder.as_deref() {
None | Some("") => format!("s3://{}/{}", self.bucket, self.region.name()),
Some(folder_str) => {
format!("s3://{}/{}/{}", self.bucket, self.region.name(), folder_str)
}
}
}
}

0 comments on commit 478826b

Please sign in to comment.