Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

bottomless: add xz compression option #780

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL"

[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.3.15", features = ["tokio", "gzip"] }
async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] }
aws-config = { version = "0.55" }
aws-sdk-s3 = { version = "0.28" }
bytes = "1"
Expand Down
5 changes: 5 additions & 0 deletions bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl WalCopier {
wal.copy_frames(&mut gzip, len).await?;
gzip.shutdown().await?;
}
CompressionKind::Xz => {
let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out);
wal.copy_frames(&mut xz, len).await?;
xz.shutdown().await?;
}
}
if tracing::enabled!(tracing::Level::DEBUG) {
let elapsed = Instant::now() - period_start;
Expand Down
6 changes: 5 additions & 1 deletion bottomless/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::bufread::{GzipDecoder, XzEncoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -32,6 +32,10 @@ impl BatchReader {
let gzip = GzipDecoder::new(reader);
Box::pin(gzip)
}
CompressionKind::Xz => {
let xz = XzEncoder::new(reader);
Box::pin(xz)
}
},
}
}
Expand Down
113 changes: 83 additions & 30 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::{anyhow, bail};
use arc_swap::ArcSwapOption;
use async_compression::tokio::write::GzipEncoder;
use async_compression::tokio::write::{GzipEncoder, XzEncoder};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Options {
let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok();
let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok();
let max_frames_per_batch =
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::<usize>()?;
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::<usize>()?;
let s3_upload_max_parallelism =
env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::<usize>()?;
let restore_transaction_page_swap_after =
Expand Down Expand Up @@ -653,7 +653,7 @@ impl Replicator {
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
CompressionKind::Gzip => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_gzip_path(db_path);
let gzip_path = Self::db_compressed_path(db_path, "gz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
Expand All @@ -671,13 +671,33 @@ impl Replicator {
);
Ok(ByteStream::from_path(gzip_path).await?)
}
CompressionKind::Xz => {
let mut reader = File::open(db_path).await?;
let xz_path = Self::db_compressed_path(db_path, "xz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open(&xz_path)
.await?;
let mut writer = XzEncoder::new(compressed_file);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.shutdown().await?;
tracing::debug!(
"Compressed database file ({} bytes) into `{}`",
size,
xz_path.display()
);
Ok(ByteStream::from_path(xz_path).await?)
}
}
}

fn db_gzip_path(db_path: &Path) -> PathBuf {
let mut gzip_path = db_path.to_path_buf();
gzip_path.pop();
gzip_path.join("db.gz")
fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf {
let mut compressed_path: PathBuf = db_path.to_path_buf();
compressed_path.pop();
compressed_path.join(format!("db.{suffix}"))
}

fn restore_db_path(&self) -> PathBuf {
Expand Down Expand Up @@ -816,9 +836,10 @@ impl Replicator {
let _ = snapshot_notifier.send(Ok(Some(generation)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
// cleanup gzip database snapshot if exists
let gzip_path = Self::db_gzip_path(&db_path);
let _ = tokio::fs::remove_file(gzip_path).await;
// cleanup gzip/xz database snapshot if exists
for suffix in &["gz", "xz"] {
let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await;
}
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
Expand Down Expand Up @@ -1160,31 +1181,58 @@ impl Replicator {
}

async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result<bool> {
let main_db_path = match self.use_compression {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
let algos_to_try = match self.use_compression {
CompressionKind::None => &[
CompressionKind::None,
CompressionKind::Xz,
CompressionKind::Gzip,
],
CompressionKind::Gzip => &[
CompressionKind::Gzip,
CompressionKind::Xz,
CompressionKind::None,
],
CompressionKind::Xz => &[
CompressionKind::Xz,
CompressionKind::Gzip,
CompressionKind::None,
],
};

if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match self.use_compression {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
for algo in algos_to_try {
let main_db_path = match algo {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation),
};
db.flush().await?;
if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match algo {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader =
async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Xz => {
let mut decompress_reader =
async_compression::tokio::bufread::XzDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;

let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
Ok(true)
} else {
Ok(false)
let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
return Ok(true);
}
}
Ok(false)
}

async fn restore_wal(
Expand Down Expand Up @@ -1235,6 +1283,7 @@ impl Replicator {
Some(result) => result,
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".xz")
&& !key.ends_with(".db")
&& !key.ends_with(".meta")
&& !key.ends_with(".dep")
Expand Down Expand Up @@ -1423,6 +1472,7 @@ impl Replicator {
let str = fpath.to_str()?;
if str.ends_with(".db")
| str.ends_with(".gz")
| str.ends_with(".xz")
| str.ends_with(".raw")
| str.ends_with(".meta")
| str.ends_with(".dep")
Expand Down Expand Up @@ -1670,13 +1720,15 @@ pub enum CompressionKind {
#[default]
None,
Gzip,
Xz,
}

impl CompressionKind {
pub fn parse(kind: &str) -> std::result::Result<Self, &str> {
match kind {
"gz" | "gzip" => Ok(CompressionKind::Gzip),
"raw" | "" => Ok(CompressionKind::None),
"xz" => Ok(CompressionKind::Xz),
other => Err(other),
}
}
Expand All @@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
match self {
CompressionKind::None => write!(f, "raw"),
CompressionKind::Gzip => write!(f, "gz"),
CompressionKind::Xz => write!(f, "xz"),
}
}
}