diff --git a/src/api/config.rs b/src/api/config.rs index 327cee6..5aa6029 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -20,6 +20,9 @@ pub trait ConfigService { /// Get config, return the content. fn get_config(&mut self, data_id: String, group: String) -> error::Result; + /// Remove config, return true/false. + fn remove_config(&mut self, data_id: String, group: String) -> error::Result; + /// Listen the config change. fn add_listener( &mut self, @@ -211,6 +214,23 @@ mod tests { sleep(Duration::from_secs(30)).await; } + // #[tokio::test] + async fn test_api_config_service_remove_config() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); + + let mut config_service = ConfigServiceBuilder::default().build().await; + + // remove a config not exit + let remove_resp = + config_service.remove_config("todo-data-id".to_string(), "todo-group".to_string()); + match remove_resp { + Ok(result) => tracing::info!("remove a config not exit: {}", result), + Err(err) => tracing::error!("remove a config not exit: {:?}", err), + } + } + struct TestConfigChangeListener; impl ConfigChangeListener for TestConfigChangeListener { diff --git a/src/common/remote/request/mod.rs b/src/common/remote/request/mod.rs index f646097..8e0e0a0 100644 --- a/src/common/remote/request/mod.rs +++ b/src/common/remote/request/mod.rs @@ -40,6 +40,9 @@ lazy_static! { /// com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest pub static ref TYPE_CONFIG_QUERY_CLIENT_REQUEST: String = String::from("ConfigQueryRequest"); + /// com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest + pub static ref TYPE_CONFIG_REMOVE_CLIENT_REQUEST: String = String::from("ConfigRemoveRequest"); + } // odd by client request id. diff --git a/src/common/remote/response/mod.rs b/src/common/remote/response/mod.rs index 798dab0..dde7b7e 100644 --- a/src/common/remote/response/mod.rs +++ b/src/common/remote/response/mod.rs @@ -51,4 +51,10 @@ lazy_static! { /// com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse pub static ref TYPE_CONFIG_CHANGE_BATCH_LISTEN_RESPONSE: String = String::from("ConfigChangeBatchListenResponse"); + /// com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse + pub static ref TYPE_CONFIG_QUERY_SERVER_RESPONSE: String = String::from("ConfigQueryResponse"); + + /// com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse + pub static ref TYPE_CONFIG_REMOVE_SERVER_RESPONSE: String = String::from("ConfigRemoveResponse"); + } diff --git a/src/config/client_request.rs b/src/config/client_request.rs index e899b5a..746ae65 100644 --- a/src/config/client_request.rs +++ b/src/config/client_request.rs @@ -109,3 +109,43 @@ impl ConfigQueryClientRequest { } } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) struct ConfigRemoveClientRequest { + requestId: String, + /// could be empty. + headers: HashMap, + /// DataId + dataId: String, + /// Group + group: String, + /// tenant + tenant: String, + /// tag + tag: Option, +} + +impl Request for ConfigRemoveClientRequest { + fn request_id(&self) -> &String { + &self.requestId + } + fn headers(&self) -> &HashMap { + &self.headers + } + fn type_url(&self) -> &String { + &TYPE_CONFIG_REMOVE_CLIENT_REQUEST + } +} + +impl ConfigRemoveClientRequest { + pub fn new(data_id: String, group: String, tenant: String) -> Self { + ConfigRemoveClientRequest { + requestId: generate_request_id(), + headers: HashMap::new(), + dataId: data_id, + group, + tenant, + tag: None, + } + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index 8564a8c..f6f3eaf 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -10,18 +10,14 @@ use crate::api::props::ClientProps; use crate::config::worker::ConfigWorker; pub(crate) struct NacosConfigService { - client_props: ClientProps, /// config client worker client_worker: ConfigWorker, } impl NacosConfigService { pub fn new(client_props: ClientProps) -> Self { - let client_worker = ConfigWorker::new(client_props.clone()); - Self { - client_props, - client_worker, - } + let client_worker = ConfigWorker::new(client_props); + Self { client_worker } } /// start Once @@ -39,18 +35,17 @@ impl ConfigService for NacosConfigService { self.client_worker.get_config(data_id, group) } + fn remove_config(&mut self, data_id: String, group: String) -> crate::api::error::Result { + self.client_worker.remove_config(data_id, group) + } + fn add_listener( &mut self, data_id: String, group: String, listener: std::sync::Arc, ) -> crate::api::error::Result<()> { - self.client_worker.add_listener( - data_id, - group, - self.client_props.namespace.clone(), - listener, - ); + self.client_worker.add_listener(data_id, group, listener); Ok(()) } @@ -60,12 +55,7 @@ impl ConfigService for NacosConfigService { group: String, listener: std::sync::Arc, ) -> crate::api::error::Result<()> { - self.client_worker.remove_listener( - data_id, - group, - self.client_props.namespace.clone(), - listener, - ); + self.client_worker.remove_listener(data_id, group, listener); Ok(()) } } diff --git a/src/config/server_response.rs b/src/config/server_response.rs index 43d5430..6833d5a 100644 --- a/src/config/server_response.rs +++ b/src/config/server_response.rs @@ -99,7 +99,7 @@ impl Response for ConfigQueryServerResponse { } fn type_url(&self) -> &String { - &TYPE_CONFIG_CHANGE_BATCH_LISTEN_RESPONSE + &TYPE_CONFIG_QUERY_SERVER_RESPONSE } } @@ -133,3 +133,40 @@ impl From<&str> for ConfigQueryServerResponse { de.unwrap() } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) struct ConfigRemoveServerResponse { + requestId: Option, + resultCode: ResponseCode, + errorCode: u32, + message: Option, +} + +impl Response for ConfigRemoveServerResponse { + fn is_success(&self) -> bool { + ResponseCode::Ok == self.resultCode + } + + fn request_id(&self) -> Option<&String> { + Option::from(&self.requestId) + } + + fn message(&self) -> Option<&String> { + Option::from(&self.message) + } + + fn error_code(&self) -> u32 { + self.errorCode + } + + fn type_url(&self) -> &String { + &TYPE_CONFIG_REMOVE_SERVER_RESPONSE + } +} + +impl From<&str> for ConfigRemoveServerResponse { + fn from(json_str: &str) -> Self { + let de: serde_json::Result = serde_json::from_str(json_str); + de.unwrap() + } +} diff --git a/src/config/worker.rs b/src/config/worker.rs index aca72bf..f19750a 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -149,14 +149,23 @@ impl ConfigWorker { )) } + pub(crate) fn remove_config( + &mut self, + data_id: String, + group: String, + ) -> crate::api::error::Result { + let tenant = self.client_props.namespace.clone(); + Self::remove_config_inner(&mut self.connection, data_id, group, tenant) + } + /// Add listener. pub(crate) fn add_listener( &mut self, data_id: String, group: String, - tenant: String, listener: Arc, ) { + let tenant = self.client_props.namespace.clone(); let group_key = util::group_key(&data_id, &group, &tenant); if let Ok(mut mutex) = self.cache_data_map.lock() { if !mutex.contains_key(group_key.as_str()) { @@ -196,9 +205,9 @@ impl ConfigWorker { &mut self, data_id: String, group: String, - tenant: String, listener: Arc, ) { + let tenant = self.client_props.namespace.clone(); let group_key = util::group_key(&data_id, &group, &tenant); if let Ok(mut mutex) = self.cache_data_map.lock() { if !mutex.contains_key(group_key.as_str()) { @@ -385,6 +394,29 @@ impl ConfigWorker { ))) } } + + fn remove_config_inner( + connection: &mut Connection, + data_id: String, + group: String, + tenant: String, + ) -> crate::api::error::Result { + let req = ConfigRemoveClientRequest::new(data_id, group, tenant); + let req_payload = payload_helper::build_req_grpc_payload(req); + let resp = connection.get_client()?.request(&req_payload)?; + let payload_inner = payload_helper::covert_payload(resp); + // return Err if get a err_resp + if payload_helper::is_err_resp(&payload_inner.type_url) { + let err_resp = ErrorResponse::from(payload_inner.body_str.as_str()); + return Err(crate::api::error::Error::ErrResult(format!( + "error_code={},message={}", + err_resp.error_code(), + err_resp.message().unwrap() + ))); + } + let remove_resp = ConfigRemoveServerResponse::from(payload_inner.body_str.as_str()); + Ok(remove_resp.is_success()) + } } /// Cache Data for Config @@ -547,19 +579,19 @@ mod tests { #[test] fn test_client_worker_add_listener() { - let mut client_worker = ConfigWorker::new(ClientProps::new()); - let (d, g, t) = ("D".to_string(), "G".to_string(), "N".to_string()); + let mut client_worker = ConfigWorker::new(ClientProps::new().namespace(t.clone())); + // test add listener1 let lis1_arc = Arc::new(TestConfigChangeListener1 {}); - let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis1_arc); + let _listen = client_worker.add_listener(d.clone(), g.clone(), lis1_arc); // test add listener2 let lis2_arc = Arc::new(TestConfigChangeListener2 {}); - let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis2_arc.clone()); + let _listen = client_worker.add_listener(d.clone(), g.clone(), lis2_arc.clone()); // test add a listener2 again - let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis2_arc); + let _listen = client_worker.add_listener(d.clone(), g.clone(), lis2_arc); let group_key = util::group_key(&d, &g, &t); { @@ -572,14 +604,14 @@ mod tests { #[test] fn test_client_worker_add_listener_then_remove() { - let mut client_worker = ConfigWorker::new(ClientProps::new()); - let (d, g, t) = ("D".to_string(), "G".to_string(), "N".to_string()); + let mut client_worker = ConfigWorker::new(ClientProps::new().namespace(t.clone())); + // test add listener1 let lis1_arc = Arc::new(TestConfigChangeListener1 {}); let lis1_arc2 = Arc::clone(&lis1_arc); - let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis1_arc); + let _listen = client_worker.add_listener(d.clone(), g.clone(), lis1_arc); let group_key = util::group_key(&d, &g, &t); { @@ -589,7 +621,7 @@ mod tests { assert_eq!(1, listen_mutex.len()); } - client_worker.remove_listener(d.clone(), g.clone(), t.clone(), lis1_arc2); + client_worker.remove_listener(d.clone(), g.clone(), lis1_arc2); { let cache_data_map_mutex = client_worker.cache_data_map.lock().unwrap(); let cache_data = cache_data_map_mutex.get(group_key.as_str()).unwrap();