Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support write batch in remote engine #840

Merged
merged 5 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ tracing_util = { workspace = true }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "1c3bf4e803ef8b7a1fbc8c3d4a07fdd372e1830b"
rev = "9e0e70f6574e4e1e2b3f7f45c0a055c54327fb4a"

[build-dependencies]
vergen = { version = "7", default-features = false, features = ["build", "git", "rustc"] }
Expand Down
20 changes: 6 additions & 14 deletions partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod partition;
use std::sync::Arc;

use async_trait::async_trait;
use common_util::{error::BoxError, runtime::Runtime};
use common_util::error::BoxError;
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
Expand All @@ -26,15 +26,11 @@ use crate::partition::{PartitionTableImpl, TableData};
/// Partition table engine implementation.
pub struct PartitionTableEngine {
remote_engine_ref: RemoteEngineRef,
io_runtime: Arc<Runtime>,
}

impl PartitionTableEngine {
pub fn new(remote_engine_ref: RemoteEngineRef, io_runtime: Arc<Runtime>) -> Self {
Self {
remote_engine_ref,
io_runtime,
}
pub fn new(remote_engine_ref: RemoteEngineRef) -> Self {
Self { remote_engine_ref }
}
}

Expand Down Expand Up @@ -62,13 +58,9 @@ impl TableEngine for PartitionTableEngine {
engine_type: request.engine,
};
Ok(Arc::new(
PartitionTableImpl::new(
table_data,
self.remote_engine_ref.clone(),
self.io_runtime.clone(),
)
.box_err()
.context(Unexpected)?,
PartitionTableImpl::new(table_data, self.remote_engine_ref.clone())
.box_err()
.context(Unexpected)?,
))
}

Expand Down
74 changes: 34 additions & 40 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

//! Distributed Table implementation

use std::{collections::HashMap, fmt, sync::Arc};
use std::{collections::HashMap, fmt};

use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
schema::Schema,
};
use common_util::{error::BoxError, runtime::Runtime};
use common_util::error::BoxError;
use futures::future::try_join_all;
use snafu::ResultExt;
use table_engine::{
Expand All @@ -18,15 +18,16 @@ use table_engine::{
},
remote::{
model::{
ReadRequest as RemoteReadRequest, TableIdentifier, WriteRequest as RemoteWriteRequest,
ReadRequest as RemoteReadRequest, TableIdentifier, WriteBatchResult,
WriteRequest as RemoteWriteRequest,
},
RemoteEngineRef,
},
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterSchemaRequest, CreatePartitionRule, FlushRequest, GetRequest, LocatePartitions,
ReadRequest, Result, Scan, Table, TableId, TableStats, UnexpectedWithMsg,
UnsupportedMethod, Write, WriteRequest,
UnsupportedMethod, Write, WriteBatch, WriteRequest,
},
};

Expand All @@ -50,19 +51,13 @@ pub struct TableData {
pub struct PartitionTableImpl {
table_data: TableData,
remote_engine: RemoteEngineRef,
io_runtime: Arc<Runtime>,
}

impl PartitionTableImpl {
pub fn new(
table_data: TableData,
remote_engine: RemoteEngineRef,
io_runtime: Arc<Runtime>,
) -> Result<Self> {
pub fn new(table_data: TableData, remote_engine: RemoteEngineRef) -> Result<Self> {
Ok(Self {
table_data,
remote_engine,
io_runtime,
})
}

Expand Down Expand Up @@ -156,50 +151,49 @@ impl Table for PartitionTableImpl {
}

// Insert split write request through remote engine.
let mut futures = Vec::with_capacity(split_rows.len());
let mut request_batch = Vec::with_capacity(split_rows.len());
for (partition, rows) in split_rows {
let sub_table_ident = self.get_sub_table_ident(partition);
let row_group = RowGroupBuilder::with_rows(schema.clone(), rows)
.box_err()
.context(Write {
table: self.get_sub_table_ident(partition).table,
.with_context(|| Write {
table: sub_table_ident.table.clone(),
})?
.build();

let request = RemoteWriteRequest {
table: self.get_sub_table_ident(partition),
table: sub_table_ident,
write_request: WriteRequest { row_group },
};
let remote_engine = self.remote_engine.clone();
let write_handle = self
.io_runtime
.spawn(async move { remote_engine.write(request).await });
futures.push(write_handle);
request_batch.push(request);
}

let write_results = {
// TODO: make it as local timer
let _remote_timer = PARTITION_TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["remote_write"])
.start_timer();

let handle = self.io_runtime.spawn(try_join_all(futures));
handle
.await
.box_err()
.context(Write { table: self.name() })?
.box_err()
.context(Write { table: self.name() })?
};

let batch_results = self
.remote_engine
.write_batch(request_batch)
.await
.box_err()
.context(WriteBatch {
tables: vec![self.table_data.table_name.clone()],
})?;
let mut total_rows = 0;
for write_result in write_results {
let written_rows = write_result
.box_err()
.context(Write { table: self.name() })?;
for batch_result in batch_results {
let WriteBatchResult {
table_idents,
result,
} = batch_result;

let written_rows = result.with_context(|| {
let tables = table_idents
.into_iter()
.map(|ident| ident.table)
.collect::<Vec<_>>();
WriteBatch { tables }
})?;
total_rows += written_rows;
}

Ok(total_rows)
Ok(total_rows as usize)
}

async fn read(&self, _request: ReadRequest) -> Result<SendableRecordBatchStream> {
Expand Down
24 changes: 16 additions & 8 deletions remote_engine_client/src/cached_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::{collections::HashMap, sync::RwLock};

use ceresdbproto::storage::{self, RequestContext};
use log::debug;
use router::RouterRef;
use router::{endpoint::Endpoint, RouterRef};
use snafu::{OptionExt, ResultExt};
use table_engine::remote::model::TableIdentifier;
use tonic::transport::Channel;
use tonic::transport::Channel as TonicChannel;

use crate::{channel::ChannelPool, config::Config, error::*};

Expand All @@ -19,12 +19,18 @@ pub struct CachedRouter {

/// Cache mapping table to channel of its endpoint
// TODO: we should add gc for the cache
cache: RwLock<HashMap<TableIdentifier, Channel>>,
cache: RwLock<HashMap<TableIdentifier, RouteContext>>,

/// Channel pool
channel_pool: ChannelPool,
}

#[derive(Clone)]
pub struct RouteContext {
pub channel: TonicChannel,
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
pub endpoint: Endpoint,
}

impl CachedRouter {
pub fn new(router: RouterRef, config: Config) -> Self {
let cache = RwLock::new(HashMap::new());
Expand All @@ -36,7 +42,7 @@ impl CachedRouter {
}
}

pub async fn route(&self, table_ident: &TableIdentifier) -> Result<Channel> {
pub async fn route(&self, table_ident: &TableIdentifier) -> Result<RouteContext> {
// Find in cache first.
let channel_opt = {
let cache = self.cache.read().unwrap();
Expand Down Expand Up @@ -77,12 +83,14 @@ impl CachedRouter {
}
}

pub async fn evict(&self, table_ident: &TableIdentifier) {
pub async fn evict(&self, table_idents: &[TableIdentifier]) {
let mut cache = self.cache.write().unwrap();
let _ = cache.remove(table_ident);
for table_ident in table_idents {
let _ = cache.remove(table_ident);
}
}

async fn do_route(&self, table_ident: &TableIdentifier) -> Result<Channel> {
async fn do_route(&self, table_ident: &TableIdentifier) -> Result<RouteContext> {
let schema = &table_ident.schema;
let table = table_ident.table.clone();
let route_request = storage::RouteRequest {
Expand Down Expand Up @@ -121,6 +129,6 @@ impl CachedRouter {
let endpoint = endpoint.into();
let channel = self.channel_pool.get(&endpoint).await?;

Ok(channel)
Ok(RouteContext { channel, endpoint })
}
}
Loading