Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: stream read metric #1203

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

use std::{
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};

Expand All @@ -32,10 +34,10 @@ use ceresdbproto::{
storage::{arrow_payload, ArrowPayload},
};
use common_types::record_batch::RecordBatch;
use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt};
use futures::stream::{self, BoxStream, FuturesUnordered, Stream, StreamExt};
use generic_error::BoxError;
use log::{error, info};
use notifier::notifier::{RequestNotifiers, RequestResult};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use proxy::{
hotspot::{HotspotRecorder, Message},
instance::InstanceRef,
Expand All @@ -49,8 +51,7 @@ use table_engine::{
table::TableRef,
};
use time_ext::InstantExt;
use tokio::sync::mpsc::{self, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tonic::{Request, Response, Status};

use super::metrics::REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM;
Expand Down Expand Up @@ -84,35 +85,36 @@ impl StreamReadReqKey {
}
}

struct ExecutionGuard<F: FnMut()> {
f: F,
cancelled: bool,
pub type StreamReadRequestNotifiers =
Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>;

struct StreamWithMetric {
inner: Receiver<Result<RecordBatch>>,
instant: Instant,
}

impl<F: FnMut()> ExecutionGuard<F> {
fn new(f: F) -> Self {
Self {
f,
cancelled: false,
}
impl StreamWithMetric {
fn new(inner: Receiver<Result<RecordBatch>>, instant: Instant) -> Self {
Self { inner, instant }
}
}

fn cancel(&mut self) {
self.cancelled = true;
impl Stream for StreamWithMetric {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}

impl<F: FnMut()> Drop for ExecutionGuard<F> {
impl Drop for StreamWithMetric {
fn drop(&mut self) {
if !self.cancelled {
(self.f)()
}
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(self.instant.saturating_elapsed().as_secs_f64());
}
}

pub type StreamReadRequestNotifiers =
Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>;

#[derive(Clone)]
pub struct RemoteEngineServiceImpl {
pub instance: InstanceRef,
Expand All @@ -125,7 +127,8 @@ impl RemoteEngineServiceImpl {
async fn stream_read_internal(
&self,
request: Request<ReadRequest>,
) -> Result<ReceiverStream<Result<RecordBatch>>> {
) -> Result<StreamWithMetric> {
let instant = Instant::now();
let ctx = self.handler_ctx();
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
let handle = self.runtimes.read_runtime.spawn(async move {
Expand Down Expand Up @@ -162,15 +165,15 @@ impl RemoteEngineServiceImpl {
});
}

// TODO: add metrics to collect the time cost of the reading.
Ok(ReceiverStream::new(rx))
Ok(StreamWithMetric::new(rx, instant))
}

async fn dedup_stream_read_internal(
&self,
request_notifiers: StreamReadRequestNotifiers,
request: Request<ReadRequest>,
) -> Result<ReceiverStream<Result<RecordBatch>>> {
) -> Result<StreamWithMetric> {
let instant = Instant::now();
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);

let request = request.into_inner();
Expand Down Expand Up @@ -200,7 +203,7 @@ impl RemoteEngineServiceImpl {
// read.
}
}
Ok(ReceiverStream::new(rx))
Ok(StreamWithMetric::new(rx, instant))
}

async fn read_and_send_dedupped_resps(
Expand All @@ -209,7 +212,6 @@ impl RemoteEngineServiceImpl {
request_key: StreamReadReqKey,
request_notifiers: StreamReadRequestNotifiers,
) -> Result<()> {
let instant = Instant::now();
let ctx = self.handler_ctx();

// This is used to remove key when future is cancelled.
Expand Down Expand Up @@ -263,10 +265,6 @@ impl RemoteEngineServiceImpl {
// Do send in background to avoid blocking the rpc procedure.
self.runtimes.read_runtime.spawn(async move {
Self::send_dedupped_resps(resps, notifiers).await;

REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());
});

Ok(())
Expand Down