Skip to content

Commit

Permalink
feat: export all schemas and data at once in export tool (GreptimeTea…
Browse files Browse the repository at this point in the history
…m#4478)

* feat: export all schemas and data at onece

* feat: introduce export all to export schemas and data at once

* feat: default value for target

* feat: refactor export target

* chore: fix unit test
  • Loading branch information
fengjiachun authored Aug 1, 2024
1 parent 6c4b8b6 commit d673147
Showing 1 changed file with 27 additions and 114 deletions.
141 changes: 27 additions & 114 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use base64::Engine;
use clap::{Parser, ValueEnum};
use client::DEFAULT_SCHEMA_NAME;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, error, info};
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
Expand All @@ -42,14 +42,13 @@ type TableReference = (String, String, String);

#[derive(Debug, Default, Clone, ValueEnum)]
enum ExportTarget {
/// Corresponding to `SHOW CREATE TABLE`
/// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
Schema,
/// Export all table data, corresponding to `COPY DATABASE TO`.
Data,
/// Export all table schemas and data at once.
#[default]
CreateTable,
/// Corresponding to `EXPORT TABLE`
#[deprecated(note = "Please use `DatabaseData` instead.")]
TableData,
/// Corresponding to `EXPORT DATABASE`
DatabaseData,
All,
}

#[derive(Debug, Default, Parser)]
Expand All @@ -75,7 +74,7 @@ pub struct ExportCommand {
max_retry: usize,

/// Things to export
#[clap(long, short = 't', value_enum)]
#[clap(long, short = 't', value_enum, default_value = "all")]
target: ExportTarget,

/// A half-open time range: [start_time, end_time).
Expand Down Expand Up @@ -178,7 +177,7 @@ impl Export {
if let Some(schema) = &self.schema {
Ok(vec![(self.catalog.clone(), schema.clone())])
} else {
let result = self.sql("show databases").await?;
let result = self.sql("SHOW DATABASES").await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
Expand All @@ -205,9 +204,11 @@ impl Export {
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
// Puts all metric table first
let sql = format!(
"select table_catalog, table_schema, table_name from \
information_schema.columns where column_name = '__tsid' \
and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'"
"SELECT table_catalog, table_schema, table_name \
FROM information_schema.columns \
WHERE column_name = '__tsid' \
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'"
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
Expand All @@ -227,9 +228,11 @@ impl Export {

// TODO: SQL injection hurts
let sql = format!(
"select table_catalog, table_schema, table_name from \
information_schema.tables where table_type = \'BASE TABLE\' \
and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'",
"SELECT table_catalog, table_schema, table_name \
FROM information_schema.tables \
WHERE table_type = \'BASE TABLE\' \
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'",
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
Expand Down Expand Up @@ -266,7 +269,7 @@ impl Export {

async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
let sql = format!(
r#"show create table "{}"."{}"."{}""#,
r#"SHOW CREATE TABLE "{}"."{}"."{}""#,
catalog, schema, table
);
let result = self.sql(&sql).await?;
Expand Down Expand Up @@ -341,99 +344,6 @@ impl Export {
Ok(())
}

async fn export_table_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
tokio::fs::create_dir_all(&self.output_dir)
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));
// Ignores metric physical tables
let (metrics_tables, table_list) = self.get_table_list(&catalog, &schema).await?;
for (_, _, table_name) in metrics_tables {
warn!("Ignores metric physical table: {table_name}");
}
for (catalog_name, schema_name, table_name) in table_list {
// copy table to
let sql = format!(
r#"Copy "{}"."{}"."{}" TO '{}{}.parquet' WITH (format='parquet');"#,
catalog_name,
schema_name,
table_name,
output_dir.to_str().unwrap(),
table_name,
);
info!("Executing sql: {sql}");
self.sql(&sql).await?;
}
info!("Finished exporting {catalog}.{schema} data");

// export copy from sql
let dir_filenames = match output_dir.read_dir() {
Ok(dir) => dir,
Err(_) => {
warn!("empty database {catalog}.{schema}");
return Ok(());
}
};

let copy_from_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql"));
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);

for table_file in dir_filenames {
let table_file = table_file.unwrap();
let table_name = table_file
.file_name()
.into_string()
.unwrap()
.replace(".parquet", "");

writer
.write(
format!(
"copy {} from '{}' with (format='parquet');\n",
table_name,
table_file.path().to_str().unwrap()
)
.as_bytes(),
)
.await
.context(FileIoSnafu)?;
}
writer.flush().await.context(FileIoSnafu)?;

info!("finished exporting {catalog}.{schema} copy_from.sql");

Ok::<(), Error>(())
});
}

let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);

Ok(())
}

async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
Expand Down Expand Up @@ -530,9 +440,12 @@ impl Export {
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
match self.target {
ExportTarget::CreateTable => self.export_create_table().await,
ExportTarget::TableData => self.export_table_data().await,
ExportTarget::DatabaseData => self.export_database_data().await,
ExportTarget::Schema => self.export_create_table().await,
ExportTarget::Data => self.export_database_data().await,
ExportTarget::All => {
self.export_create_table().await?;
self.export_database_data().await
}
}
}
}
Expand Down Expand Up @@ -619,7 +532,7 @@ mod tests {
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"create-table",
"schema",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
Expand Down

0 comments on commit d673147

Please sign in to comment.