From a34dccf0c7072ff65a9bb44738b6dc1888eaa777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=8D=E9=87=91=E6=97=A5?= Date: Mon, 17 Jul 2023 20:04:23 +0800 Subject: [PATCH] fix: avoid write queue full block (#1065) ## Rationale If the first write request is canceled, the write request in the queue will never be executed, until the queue is full, then an error will be returned. ## Detailed Changes Use `CancellationSafeFuture` to execute the write requests, see #1071 ## Test Plan Existing tests. --- analytic_engine/src/instance/mod.rs | 5 ++ analytic_engine/src/table/mod.rs | 107 ++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 29 deletions(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 1faf254f08..6546a3e97f 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -266,6 +266,11 @@ impl Instance { &self.runtimes.read_runtime } + #[inline] + pub fn write_runtime(&self) -> &Arc { + &self.runtimes.write_runtime + } + #[inline] fn make_flusher(&self) -> Flusher { Flusher { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 82b2b54b9c..f1df20a4b8 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -2,7 +2,11 @@ //! Table implementation -use std::{collections::HashMap, fmt, sync::Mutex}; +use std::{ + collections::HashMap, + fmt, + sync::{Arc, Mutex}, +}; use async_trait::async_trait; use common_types::{ @@ -10,7 +14,7 @@ use common_types::{ schema::Schema, time::TimeRange, }; -use common_util::error::BoxError; +use common_util::{error::BoxError, future_cancel::CancellationSafeFuture}; use datafusion::{common::Column, logical_expr::Expr}; use futures::TryStreamExt; use log::{error, warn}; @@ -47,6 +51,29 @@ const GET_METRICS_COLLECTOR_NAME: &str = "get"; // writes. const ADDITIONAL_PENDING_WRITE_CAP_RATIO: usize = 10; +struct WriteRequests { + pub space_table: SpaceAndTable, + pub instance: InstanceRef, + pub table_data: TableDataRef, + pub pending_writes: Arc>, +} + +impl WriteRequests { + pub fn new( + space_table: SpaceAndTable, + instance: InstanceRef, + table_data: TableDataRef, + pending_writes: Arc>, + ) -> Self { + Self { + space_table, + instance, + table_data, + pending_writes, + } + } +} + /// Table trait implementation pub struct TableImpl { space_table: SpaceAndTable, @@ -63,7 +90,7 @@ pub struct TableImpl { table_data: TableDataRef, /// Buffer for written rows. - pending_writes: Mutex, + pending_writes: Arc>, } impl TableImpl { @@ -78,7 +105,7 @@ impl TableImpl { space_id, table_id: table_data.id, table_data, - pending_writes, + pending_writes: Arc::new(pending_writes), } } } @@ -250,25 +277,28 @@ impl TableImpl { let mut pending_queue = self.pending_writes.lock().unwrap(); pending_queue.try_push(request) }; - let (request, mut serial_exec, notifiers) = match queue_res { + + match queue_res { QueueResult::First => { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec = self.table_data.serial_exec.lock().await; - // The `serial_exec` is acquired, let's merge the pending requests and write - // them all. - let pending_writes = { - let mut pending_queue = self.pending_writes.lock().unwrap(); - pending_queue.take_pending_writes() - }; - assert!( - !pending_writes.is_empty(), - "The pending writes should contain at least the one just pushed." + let write_requests = WriteRequests::new( + self.space_table.clone(), + self.instance.clone(), + self.table_data.clone(), + self.pending_writes.clone(), ); - let merged_write_request = - merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); - (merged_write_request, serial_exec, pending_writes.notifiers) + + match CancellationSafeFuture::new( + Self::write_requests(write_requests), + self.instance.write_runtime().clone(), + ) + .await + { + Ok(_) => Ok(num_rows), + Err(e) => Err(e), + } } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -276,9 +306,9 @@ impl TableImpl { match rx.await { Ok(res) => { res.box_err().context(Write { table: self.name() })?; - return Ok(num_rows); + Ok(num_rows) } - Err(_) => return WaitForPendingWrites { table: self.name() }.fail(), + Err(_) => WaitForPendingWrites { table: self.name() }.fail(), } } QueueResult::Reject(_) => { @@ -288,24 +318,43 @@ impl TableImpl { self.instance.max_rows_in_write_queue, self.name(), ); - return TooManyPendingWrites { table: self.name() }.fail(); + TooManyPendingWrites { table: self.name() }.fail() } + } + } + + async fn write_requests(write_requests: WriteRequests) -> Result<()> { + let mut serial_exec = write_requests.table_data.serial_exec.lock().await; + // The `serial_exec` is acquired, let's merge the pending requests and write + // them all. + let pending_writes = { + let mut pending_queue = write_requests.pending_writes.lock().unwrap(); + pending_queue.take_pending_writes() }; + assert!( + !pending_writes.is_empty(), + "The pending writes should contain at least the one just pushed." + ); + let merged_write_request = + merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); let mut writer = Writer::new( - self.instance.clone(), - self.space_table.clone(), + write_requests.instance, + write_requests.space_table, &mut serial_exec, ); let write_res = writer - .write(request) + .write(merged_write_request) .await .box_err() - .context(Write { table: self.name() }); + .context(Write { + table: write_requests.table_data.name.clone(), + }); // There is no waiter for pending writes, return the write result. + let notifiers = pending_writes.notifiers; if notifiers.is_empty() { - return write_res; + return Ok(()); } // Notify the waiters for the pending writes. @@ -315,11 +364,11 @@ impl TableImpl { if notifier.send(Ok(())).is_err() { warn!( "Failed to notify the ok result of pending writes, table:{}", - self.name() + write_requests.table_data.name ); } } - Ok(num_rows) + Ok(()) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -328,7 +377,7 @@ impl TableImpl { if notifier.send(err).is_err() { warn!( "Failed to notify the error result of pending writes, table:{}", - self.name() + write_requests.table_data.name ); } }