Skip to content

Commit

Permalink
feat(config-api #29): remove_config() for Remove a Config. (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
CherishCai authored Oct 19, 2022
1 parent 2033706 commit 35fade0
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 30 deletions.
20 changes: 20 additions & 0 deletions src/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub trait ConfigService {
/// Get config, return the content.
fn get_config(&mut self, data_id: String, group: String) -> error::Result<ConfigResponse>;

/// Remove config, return true/false.
fn remove_config(&mut self, data_id: String, group: String) -> error::Result<bool>;

/// Listen the config change.
fn add_listener(
&mut self,
Expand Down Expand Up @@ -211,6 +214,23 @@ mod tests {
sleep(Duration::from_secs(30)).await;
}

// #[tokio::test]
async fn test_api_config_service_remove_config() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();

let mut config_service = ConfigServiceBuilder::default().build().await;

// remove a config not exit
let remove_resp =
config_service.remove_config("todo-data-id".to_string(), "todo-group".to_string());
match remove_resp {
Ok(result) => tracing::info!("remove a config not exit: {}", result),
Err(err) => tracing::error!("remove a config not exit: {:?}", err),
}
}

struct TestConfigChangeListener;

impl ConfigChangeListener for TestConfigChangeListener {
Expand Down
3 changes: 3 additions & 0 deletions src/common/remote/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ lazy_static! {
/// com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest
pub static ref TYPE_CONFIG_QUERY_CLIENT_REQUEST: String = String::from("ConfigQueryRequest");

/// com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest
pub static ref TYPE_CONFIG_REMOVE_CLIENT_REQUEST: String = String::from("ConfigRemoveRequest");

}

// odd by client request id.
Expand Down
6 changes: 6 additions & 0 deletions src/common/remote/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ lazy_static! {
/// com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse
pub static ref TYPE_CONFIG_CHANGE_BATCH_LISTEN_RESPONSE: String = String::from("ConfigChangeBatchListenResponse");

/// com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse
pub static ref TYPE_CONFIG_QUERY_SERVER_RESPONSE: String = String::from("ConfigQueryResponse");

/// com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse
pub static ref TYPE_CONFIG_REMOVE_SERVER_RESPONSE: String = String::from("ConfigRemoveResponse");

}
40 changes: 40 additions & 0 deletions src/config/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,43 @@ impl ConfigQueryClientRequest {
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ConfigRemoveClientRequest {
requestId: String,
/// could be empty.
headers: HashMap<String, String>,
/// DataId
dataId: String,
/// Group
group: String,
/// tenant
tenant: String,
/// tag
tag: Option<String>,
}

impl Request for ConfigRemoveClientRequest {
fn request_id(&self) -> &String {
&self.requestId
}
fn headers(&self) -> &HashMap<String, String> {
&self.headers
}
fn type_url(&self) -> &String {
&TYPE_CONFIG_REMOVE_CLIENT_REQUEST
}
}

impl ConfigRemoveClientRequest {
pub fn new(data_id: String, group: String, tenant: String) -> Self {
ConfigRemoveClientRequest {
requestId: generate_request_id(),
headers: HashMap::new(),
dataId: data_id,
group,
tenant,
tag: None,
}
}
}
26 changes: 8 additions & 18 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@ use crate::api::props::ClientProps;
use crate::config::worker::ConfigWorker;

pub(crate) struct NacosConfigService {
client_props: ClientProps,
/// config client worker
client_worker: ConfigWorker,
}

impl NacosConfigService {
pub fn new(client_props: ClientProps) -> Self {
let client_worker = ConfigWorker::new(client_props.clone());
Self {
client_props,
client_worker,
}
let client_worker = ConfigWorker::new(client_props);
Self { client_worker }
}

/// start Once
Expand All @@ -39,18 +35,17 @@ impl ConfigService for NacosConfigService {
self.client_worker.get_config(data_id, group)
}

fn remove_config(&mut self, data_id: String, group: String) -> crate::api::error::Result<bool> {
self.client_worker.remove_config(data_id, group)
}

fn add_listener(
&mut self,
data_id: String,
group: String,
listener: std::sync::Arc<dyn crate::api::config::ConfigChangeListener>,
) -> crate::api::error::Result<()> {
self.client_worker.add_listener(
data_id,
group,
self.client_props.namespace.clone(),
listener,
);
self.client_worker.add_listener(data_id, group, listener);
Ok(())
}

Expand All @@ -60,12 +55,7 @@ impl ConfigService for NacosConfigService {
group: String,
listener: std::sync::Arc<dyn crate::api::config::ConfigChangeListener>,
) -> crate::api::error::Result<()> {
self.client_worker.remove_listener(
data_id,
group,
self.client_props.namespace.clone(),
listener,
);
self.client_worker.remove_listener(data_id, group, listener);
Ok(())
}
}
39 changes: 38 additions & 1 deletion src/config/server_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Response for ConfigQueryServerResponse {
}

fn type_url(&self) -> &String {
&TYPE_CONFIG_CHANGE_BATCH_LISTEN_RESPONSE
&TYPE_CONFIG_QUERY_SERVER_RESPONSE
}
}

Expand Down Expand Up @@ -133,3 +133,40 @@ impl From<&str> for ConfigQueryServerResponse {
de.unwrap()
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ConfigRemoveServerResponse {
requestId: Option<String>,
resultCode: ResponseCode,
errorCode: u32,
message: Option<String>,
}

impl Response for ConfigRemoveServerResponse {
fn is_success(&self) -> bool {
ResponseCode::Ok == self.resultCode
}

fn request_id(&self) -> Option<&String> {
Option::from(&self.requestId)
}

fn message(&self) -> Option<&String> {
Option::from(&self.message)
}

fn error_code(&self) -> u32 {
self.errorCode
}

fn type_url(&self) -> &String {
&TYPE_CONFIG_REMOVE_SERVER_RESPONSE
}
}

impl From<&str> for ConfigRemoveServerResponse {
fn from(json_str: &str) -> Self {
let de: serde_json::Result<Self> = serde_json::from_str(json_str);
de.unwrap()
}
}
54 changes: 43 additions & 11 deletions src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,23 @@ impl ConfigWorker {
))
}

pub(crate) fn remove_config(
&mut self,
data_id: String,
group: String,
) -> crate::api::error::Result<bool> {
let tenant = self.client_props.namespace.clone();
Self::remove_config_inner(&mut self.connection, data_id, group, tenant)
}

/// Add listener.
pub(crate) fn add_listener(
&mut self,
data_id: String,
group: String,
tenant: String,
listener: Arc<dyn crate::api::config::ConfigChangeListener>,
) {
let tenant = self.client_props.namespace.clone();
let group_key = util::group_key(&data_id, &group, &tenant);
if let Ok(mut mutex) = self.cache_data_map.lock() {
if !mutex.contains_key(group_key.as_str()) {
Expand Down Expand Up @@ -196,9 +205,9 @@ impl ConfigWorker {
&mut self,
data_id: String,
group: String,
tenant: String,
listener: Arc<dyn crate::api::config::ConfigChangeListener>,
) {
let tenant = self.client_props.namespace.clone();
let group_key = util::group_key(&data_id, &group, &tenant);
if let Ok(mut mutex) = self.cache_data_map.lock() {
if !mutex.contains_key(group_key.as_str()) {
Expand Down Expand Up @@ -385,6 +394,29 @@ impl ConfigWorker {
)))
}
}

fn remove_config_inner(
connection: &mut Connection,
data_id: String,
group: String,
tenant: String,
) -> crate::api::error::Result<bool> {
let req = ConfigRemoveClientRequest::new(data_id, group, tenant);
let req_payload = payload_helper::build_req_grpc_payload(req);
let resp = connection.get_client()?.request(&req_payload)?;
let payload_inner = payload_helper::covert_payload(resp);
// return Err if get a err_resp
if payload_helper::is_err_resp(&payload_inner.type_url) {
let err_resp = ErrorResponse::from(payload_inner.body_str.as_str());
return Err(crate::api::error::Error::ErrResult(format!(
"error_code={},message={}",
err_resp.error_code(),
err_resp.message().unwrap()
)));
}
let remove_resp = ConfigRemoveServerResponse::from(payload_inner.body_str.as_str());
Ok(remove_resp.is_success())
}
}

/// Cache Data for Config
Expand Down Expand Up @@ -547,19 +579,19 @@ mod tests {

#[test]
fn test_client_worker_add_listener() {
let mut client_worker = ConfigWorker::new(ClientProps::new());

let (d, g, t) = ("D".to_string(), "G".to_string(), "N".to_string());

let mut client_worker = ConfigWorker::new(ClientProps::new().namespace(t.clone()));

// test add listener1
let lis1_arc = Arc::new(TestConfigChangeListener1 {});
let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis1_arc);
let _listen = client_worker.add_listener(d.clone(), g.clone(), lis1_arc);

// test add listener2
let lis2_arc = Arc::new(TestConfigChangeListener2 {});
let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis2_arc.clone());
let _listen = client_worker.add_listener(d.clone(), g.clone(), lis2_arc.clone());
// test add a listener2 again
let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis2_arc);
let _listen = client_worker.add_listener(d.clone(), g.clone(), lis2_arc);

let group_key = util::group_key(&d, &g, &t);
{
Expand All @@ -572,14 +604,14 @@ mod tests {

#[test]
fn test_client_worker_add_listener_then_remove() {
let mut client_worker = ConfigWorker::new(ClientProps::new());

let (d, g, t) = ("D".to_string(), "G".to_string(), "N".to_string());

let mut client_worker = ConfigWorker::new(ClientProps::new().namespace(t.clone()));

// test add listener1
let lis1_arc = Arc::new(TestConfigChangeListener1 {});
let lis1_arc2 = Arc::clone(&lis1_arc);
let _listen = client_worker.add_listener(d.clone(), g.clone(), t.clone(), lis1_arc);
let _listen = client_worker.add_listener(d.clone(), g.clone(), lis1_arc);

let group_key = util::group_key(&d, &g, &t);
{
Expand All @@ -589,7 +621,7 @@ mod tests {
assert_eq!(1, listen_mutex.len());
}

client_worker.remove_listener(d.clone(), g.clone(), t.clone(), lis1_arc2);
client_worker.remove_listener(d.clone(), g.clone(), lis1_arc2);
{
let cache_data_map_mutex = client_worker.cache_data_map.lock().unwrap();
let cache_data = cache_data_map_mutex.get(group_key.as_str()).unwrap();
Expand Down

0 comments on commit 35fade0

Please sign in to comment.