-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #879 from 4t145/gateway-notification-support
gateway: add notification support
- Loading branch information
Showing
4 changed files
with
127 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
backend/gateways/spacegate-plugins/src/extension/notification.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
use std::{borrow::Cow, collections::HashMap, sync::Arc}; | ||
|
||
use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri}; | ||
use serde::Serialize; | ||
use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest}; | ||
use tardis::{log as tracing, serde_json}; | ||
|
||
/// Context to call notification api | ||
/// | ||
/// Extract it from request extensions, and call [`NotificationContext::notify`] to send notification | ||
#[derive(Debug, Clone)] | ||
pub struct NotificationContext { | ||
pub(crate) api: Arc<Uri>, | ||
pub(crate) headers: Arc<HashMap<HeaderName, HeaderValue>>, | ||
pub(crate) client: HttpClient, | ||
} | ||
|
||
impl NotificationContext { | ||
fn build_notification_request(&self, req: &ReachMsgSendReq) -> SgRequest { | ||
let req_bytes = serde_json::to_vec(&req).expect("ReachMsgSendReq is a valid json"); | ||
let body = SgBody::full(req_bytes); | ||
let mut req = SgRequest::new(body); | ||
req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); | ||
*req.uri_mut() = self.api.as_ref().clone(); | ||
for (k, v) in self.headers.iter() { | ||
req.headers_mut().insert(k.clone(), v.clone()); | ||
} | ||
req | ||
} | ||
pub async fn notify(&self, req: &ReachMsgSendReq) { | ||
let notify_response = self.client.clone().request(self.build_notification_request(req)).await; | ||
if !notify_response.status().is_success() { | ||
tracing::warn!(response = ?notify_response, "send notification failed"); | ||
} | ||
|
||
let Ok(response) = notify_response.into_body().dump().await.inspect_err(|e| { | ||
tracing::error!(error = ?e, "failed to read response body"); | ||
}) else { | ||
return; | ||
}; | ||
let response_str = String::from_utf8_lossy(response.get_dumped().expect("just dump body")); | ||
tracing::debug!(response = ?response_str, "receive notification api response"); | ||
} | ||
} | ||
|
||
|
||
#[derive(Debug, Serialize)] | ||
pub struct ReachMsgSendReq { | ||
pub scene_code: String, | ||
pub receives: Vec<ReachMsgReceive>, | ||
pub rel_item_id: String, | ||
pub replace: HashMap<String, String>, | ||
} | ||
|
||
#[derive(Debug, Serialize)] | ||
pub struct ReachMsgReceive { | ||
pub receive_group_code: String, | ||
pub receive_kind: String, | ||
pub receive_ids: Vec<String>, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,3 +5,4 @@ pub mod auth; | |
pub mod ip_time; | ||
pub mod op_redis_publisher; | ||
pub mod rewrite_ns_b_ip; | ||
pub mod notify; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
use http::{HeaderName, HeaderValue, Uri}; | ||
use serde::{Deserialize, Serialize}; | ||
use spacegate_shell::{ | ||
kernel::backend_service::http_client_service::get_client, | ||
plugin::{plugin_meta, schema, schemars, Inner, Plugin, PluginSchemaExt}, | ||
BoxError, SgRequest, SgRequestExt, SgResponse, | ||
}; | ||
use tardis::serde_json; | ||
|
||
use crate::extension::notification::NotificationContext; | ||
#[derive(Serialize, Deserialize, schemars::JsonSchema)] | ||
pub struct NotifyPluginConfig { | ||
api: String, | ||
headers: HashMap<String, String>, | ||
} | ||
schema!(NotifyPlugin, NotifyPluginConfig); | ||
#[derive(Debug, Clone)] | ||
pub struct NotifyPlugin { | ||
api: Arc<Uri>, | ||
headers: Arc<HashMap<HeaderName, HeaderValue>>, | ||
} | ||
|
||
impl Plugin for NotifyPlugin { | ||
const CODE: &'static str = "notify"; | ||
fn meta() -> spacegate_shell::model::PluginMetaData { | ||
plugin_meta! { | ||
description: "attach a notification api calling context to the request" | ||
} | ||
} | ||
fn create(plugin_config: spacegate_shell::model::PluginConfig) -> Result<Self, BoxError> { | ||
// parse uri | ||
let config: NotifyPluginConfig = serde_json::from_value(plugin_config.spec)?; | ||
let api = config.api.parse::<Uri>()?; | ||
let headers = config | ||
.headers | ||
.into_iter() | ||
.map_while(|(k, v)| { | ||
if let (Ok(k), Ok(v)) = (k.parse::<HeaderName>(), v.parse::<HeaderValue>()) { | ||
Some((k, v)) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect(); | ||
Ok(Self { | ||
api: Arc::new(api), | ||
headers: Arc::new(headers), | ||
}) | ||
} | ||
async fn call(&self, mut req: SgRequest, inner: Inner) -> Result<SgResponse, BoxError> { | ||
let context = NotificationContext { | ||
api: self.api.clone(), | ||
headers: self.headers.clone(), | ||
client: get_client(), | ||
}; | ||
req.extensions_mut().insert(context.clone()); | ||
req.reflect_mut().insert(context); | ||
Ok(inner.call(req).await) | ||
} | ||
fn schema_opt() -> Option<schemars::schema::RootSchema> { | ||
Some(NotifyPlugin::schema()) | ||
} | ||
} |