Skip to content

Commit

Permalink
Done fn move_one_table
Browse files Browse the repository at this point in the history
  • Loading branch information
TaQuangKhoi committed Aug 19, 2024
1 parent 258fdb9 commit 2dd4b22
Showing 1 changed file with 44 additions and 1 deletion.
45 changes: 44 additions & 1 deletion src/action/move_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::env::var;
use log::info;
use log::{error, info};
use postgres::{Column, Row};
use crate::action::working_database::{get_cell_value_by_column_name, get_rows};
use crate::core::table::Table;
use crate::database::connect;

pub fn move_one_table(table_name: String) {
let source_database_name = var("POSTGRES_DB_SOURCE").unwrap_or(String::from(""));
Expand All @@ -22,6 +23,30 @@ pub fn move_one_table(table_name: String) {
return;
}

// Check if the table is existed in the target database
let mut pg_client = connect(target_database_name.clone()).unwrap();
let query_check_table_existed = format!("
SELECT EXISTS (
SELECT 1
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = '{}'
);", table_name);
let rows = match pg_client.query(&query_check_table_existed, &[]) {
Ok(rows) => rows,
Err(err) => {
info!("Error querying : {:?}", err);
return;
}
};
let row = rows.get(0).unwrap();
let is_table_existed: bool = row.get(0);
if !is_table_existed {
info!("Table: {} does not exist in the target database", table_name);
return;
}


let mut queries: Vec<String> = Vec::new();
// STEP 2: Insert data into target database
for source_row in source_rows.clone() {
Expand All @@ -32,6 +57,24 @@ pub fn move_one_table(table_name: String) {

// len
info!("Queries len: {:?}", queries.len());
let mut failed_queries: Vec<String> = Vec::new();
for query in queries {
let mut pg_client = connect(target_database_name.clone()).unwrap();
info!("Query: {:?}", query);
match pg_client.query(&query, &[]) {
Ok(_) => {
info!("Query executed successfully");
}
Err(err) => {
failed_queries.push(query);
error!("Error querying : {:?}", err);
}
};
}

if failed_queries.len() > 0 {
info!("Failed queries: {:?}", failed_queries);
}
}

fn build_insert_query(table_name: &String, columns: &[Column], row: &Row) -> String {
Expand Down

0 comments on commit 2dd4b22

Please sign in to comment.