From 2dd4b22a01ea72e8c393581a327723b75739ee6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=E1=BA=A1=20Quang=20Kh=C3=B4i?= Date: Mon, 19 Aug 2024 17:29:51 +0700 Subject: [PATCH] Done fn move_one_table --- src/action/move_data.rs | 45 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/action/move_data.rs b/src/action/move_data.rs index b94826f..6112c5c 100644 --- a/src/action/move_data.rs +++ b/src/action/move_data.rs @@ -1,8 +1,9 @@ use std::env::var; -use log::info; +use log::{error, info}; use postgres::{Column, Row}; use crate::action::working_database::{get_cell_value_by_column_name, get_rows}; use crate::core::table::Table; +use crate::database::connect; pub fn move_one_table(table_name: String) { let source_database_name = var("POSTGRES_DB_SOURCE").unwrap_or(String::from("")); @@ -22,6 +23,30 @@ pub fn move_one_table(table_name: String) { return; } + // Check if the table is existed in the target database + let mut pg_client = connect(target_database_name.clone()).unwrap(); + let query_check_table_existed = format!(" + SELECT EXISTS ( + SELECT 1 + FROM pg_tables + WHERE schemaname = 'public' + AND tablename = '{}' + );", table_name); + let rows = match pg_client.query(&query_check_table_existed, &[]) { + Ok(rows) => rows, + Err(err) => { + info!("Error querying : {:?}", err); + return; + } + }; + let row = rows.get(0).unwrap(); + let is_table_existed: bool = row.get(0); + if !is_table_existed { + info!("Table: {} does not exist in the target database", table_name); + return; + } + + let mut queries: Vec = Vec::new(); // STEP 2: Insert data into target database for source_row in source_rows.clone() { @@ -32,6 +57,24 @@ pub fn move_one_table(table_name: String) { // len info!("Queries len: {:?}", queries.len()); + let mut failed_queries: Vec = Vec::new(); + for query in queries { + let mut pg_client = connect(target_database_name.clone()).unwrap(); + info!("Query: {:?}", query); + match pg_client.query(&query, &[]) { + Ok(_) => { + info!("Query executed successfully"); + } + Err(err) => { + failed_queries.push(query); + error!("Error querying : {:?}", err); + } + }; + } + + if failed_queries.len() > 0 { + info!("Failed queries: {:?}", failed_queries); + } } fn build_insert_query(table_name: &String, columns: &[Column], row: &Row) -> String {