Skip to content

Commit

Permalink
⚗️ (tracing-msg): 添加PushMsg::bulk_push
Browse files Browse the repository at this point in the history
  • Loading branch information
czy-29 committed Jan 6, 2025
1 parent 9a38c97 commit c250f48
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
39 changes: 22 additions & 17 deletions tracing-surreal/src/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,35 +270,40 @@ impl<C: Connection> CloseTransport for Stop<C> {
impl<C: Connection> PushMsg for Stop<C> {
type Error = StopError;

async fn push_msg(&mut self, msg: TracingMsg) -> Result<(), Self::Error> {
async fn bulk_push(&mut self, msgs: Vec<TracingMsg>) -> Result<(), Self::Error> {
if !self.can_push {
return Err(StopError::ObserverCannotPush);
}

#[derive(Serialize)]
struct MsgRecord {
id: RecordId,
session_id: RecordId,
client_id: RecordId,
#[serde(flatten)]
msg: TracingMsg,
}

let timestamp = msg.timestamp;
let session_id = self.session_id.clone();
let client_id = self.client_id.clone();
let record = MsgRecord {
session_id,
client_id,
msg,
};
let _rid: Option<RID> = self
.db
.create((
format!("{}-msg", self.formatted_timestamp),
Ulid::from_datetime(timestamp.into()).to_string(),
))
.content(record)
.await?;
let table_name = format!("{}-msg", self.formatted_timestamp);
let mut records = Vec::new();

for msg in msgs {
let id = RecordId::from_table_key(
&table_name,
Ulid::from_datetime(msg.timestamp.into()).to_string(),
);
let session_id = self.session_id.clone();
let client_id = self.client_id.clone();

records.push(MsgRecord {
id,
session_id,
client_id,
msg,
});
}

let _rids: Vec<RID> = self.db.insert(table_name).content(records).await?;

Ok(())
}
Expand Down
18 changes: 14 additions & 4 deletions tracing-surreal/src/tracing_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derive_more::Display;
use est::{task::TaskId, thread::ThreadId};
use indexmap::{map::Entry, IndexMap};
use serde::{Deserialize, Serialize};
use std::{num::NonZeroU64, ops::Deref, thread};
use std::{future::Future, num::NonZeroU64, ops::Deref, thread};
use tokio::task;

pub(crate) mod handshake;
Expand Down Expand Up @@ -507,8 +507,18 @@ pub trait CloseTransport: 'static {
async fn close_transport(&mut self, msg: Option<CloseMsg>);
}

#[trait_variant::make(Send)]
pub trait PushMsg: 'static {
pub trait PushMsg: Send + 'static {
type Error: std::error::Error + Send + 'static;
async fn push_msg(&mut self, msg: TracingMsg) -> Result<(), Self::Error>;

fn bulk_push(
&mut self,
msgs: Vec<TracingMsg>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

fn push_msg(
&mut self,
msg: TracingMsg,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.bulk_push(vec![msg])
}
}

0 comments on commit c250f48

Please sign in to comment.