Skip to content

Commit

Permalink
refactor: naming grpc move to common module (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
onewe authored Oct 30, 2022
1 parent 36663cb commit 037dffc
Show file tree
Hide file tree
Showing 54 changed files with 833 additions and 801 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ serde_json = "1"
serde_repr = "0.1"
lazy_static = "1.4"
#crossbeam = "0"
#async-trait = "0"
# async-trait = "0"
#async_once = "0"

#opentelemetry = "0"
Expand Down
4 changes: 2 additions & 2 deletions nacos-macro/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) fn grpc_request(

// add derive GrpcMessageData
let grpc_message_body = quote! {
impl crate::naming::grpc::message::GrpcMessageData for #name {
impl crate::common::remote::grpc::message::GrpcMessageData for #name {
fn identity<'a>() -> std::borrow::Cow<'a, str> {
#identity.into()
}
Expand All @@ -27,7 +27,7 @@ pub(crate) fn grpc_request(
// add derive GrpcRequestMessage
let grpc_message_request = quote! {

impl crate::naming::grpc::message::GrpcRequestMessage for #name {
impl crate::common::remote::grpc::message::GrpcRequestMessage for #name {

fn header(&self, key: &str) -> Option<&String>{
self.headers.get(key)
Expand Down
4 changes: 2 additions & 2 deletions nacos-macro/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) fn grpc_response(

// add derive GrpcMessageBody
let grpc_message_body = quote! {
impl crate::naming::grpc::message::GrpcMessageData for #name {
impl crate::common::remote::grpc::message::GrpcMessageData for #name {
fn identity<'a>() -> std::borrow::Cow<'a, str> {
#identity.into()
}
Expand Down Expand Up @@ -55,7 +55,7 @@ pub(crate) fn grpc_response(

let grpc_message_response = quote! {

impl crate::naming::grpc::message::GrpcResponseMessage for #name {
impl crate::common::remote::grpc::message::GrpcResponseMessage for #name {
fn request_id(&self) -> Option<&String> {
self.request_id.as_ref()
}
Expand Down
17 changes: 13 additions & 4 deletions src/api/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use crate::api::error::Result;
use futures::Future;
use tracing::warn;
pub mod naming;

pub type HandEventFuture = Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>;
Expand All @@ -14,19 +15,27 @@ pub trait NacosEvent: Any + Send + Sync + 'static {
}

fn as_any(&self) -> &dyn Any;

fn event_identity(&self) -> String;
}

pub trait Subscriber: Send + Sync + 'static {
fn on_event(&self, event: Arc<Box<dyn NacosEvent>>) -> Option<HandEventFuture>;
fn on_event(&self, event: Arc<Box<dyn NacosEvent>>);

fn event_type(&self) -> TypeId;

fn subscriber_type(&self) -> TypeId;
}

impl<T: NacosEventSubscriber> Subscriber for T {
fn on_event(&self, event: Arc<Box<dyn NacosEvent>>) -> Option<HandEventFuture> {
let event = event.as_any().downcast_ref::<T::EventType>()?;
fn on_event(&self, event: Arc<Box<dyn NacosEvent>>) {
let event_identity = event.event_identity();
let event = event.as_any().downcast_ref::<T::EventType>();
if event.is_none() {
warn!("event {} cannot cast target object", event_identity);
return;
}
let event = event.unwrap();
self.on_event(event)
}

Expand All @@ -42,5 +51,5 @@ impl<T: NacosEventSubscriber> Subscriber for T {
pub trait NacosEventSubscriber: Send + Sync + 'static {
type EventType;

fn on_event(&self, event: &Self::EventType) -> Option<HandEventFuture>;
fn on_event(&self, event: &Self::EventType);
}
4 changes: 4 additions & 0 deletions src/api/events/naming/instances_change_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ impl NacosEvent for InstancesChangeEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn event_identity(&self) -> String {
"InstancesChangeEvent".to_string()
}
}
18 changes: 9 additions & 9 deletions src/common/event_bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ mod __private {
for subscriber in subscribers {
let event = event.clone();
let subscriber = subscriber.clone();
let hand_event_future = subscriber.on_event(event);
if hand_event_future.is_none() {
continue;
}
let hand_event_future = hand_event_future.unwrap();
executor::spawn(hand_event_future);
executor::spawn(async move {
subscriber.on_event(event);
});
}
} else {
warn!("{:?} has not been subscribed by anyone.", key);
Expand Down Expand Up @@ -154,7 +151,7 @@ mod tests {
use core::time;
use std::{any::Any, sync::Arc, thread};

use crate::api::events::{HandEventFuture, NacosEvent, NacosEventSubscriber, Subscriber};
use crate::api::events::{NacosEvent, NacosEventSubscriber, Subscriber};

#[derive(Clone, Debug)]
pub(crate) struct NamingChangeEvent {
Expand All @@ -165,6 +162,10 @@ mod tests {
fn as_any(&self) -> &dyn Any {
self
}

fn event_identity(&self) -> String {
"NamingChangeEvent".to_string()
}
}

#[derive(Hash, PartialEq)]
Expand All @@ -173,9 +174,8 @@ mod tests {
impl NacosEventSubscriber for NamingChangeSubscriber {
type EventType = NamingChangeEvent;

fn on_event(&self, event: &Self::EventType) -> Option<HandEventFuture> {
fn on_event(&self, event: &Self::EventType) {
println!("it has already received an event. {:?}", event);
None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ impl NacosEvent for GrpcConnectHealthCheckEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn event_identity(&self) -> String {
"GrpcConnectHealthCheckEvent".to_string()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ impl NacosEvent for GrpcDisconnectEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn event_identity(&self) -> String {
"GrpcDisconnectEvent".to_string()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ impl NacosEvent for GrpcReconnectedEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn event_identity(&self) -> String {
"GrpcReconnectedEvent".to_string()
}
}
File renamed without changes.
Loading

0 comments on commit 037dffc

Please sign in to comment.