Skip to content

Commit

Permalink
feat: make remote client configurable (#948)
Browse files Browse the repository at this point in the history
## Related Issues
Closes #

## Detailed Changes

* add compression config in remote client config 
* add remote client config in server config 

## Test Plan
  • Loading branch information
MichaelLeeHZ authored Jun 5, 2023
1 parent b08e934 commit b11574c
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/arrow_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ workspace = true

[dependencies]
arrow = { workspace = true }
serde = { workspace = true }
snafu = { workspace = true }
zstd = { workspace = true }
16 changes: 14 additions & 2 deletions components/arrow_ext/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow::{
ipc::{reader::StreamReader, writer::StreamWriter},
record_batch::RecordBatch,
};
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, ResultExt, Snafu};

#[derive(Snafu, Debug)]
Expand All @@ -28,7 +29,9 @@ pub enum Error {

type Result<T> = std::result::Result<T, Error>;

#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;

#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub enum CompressionMethod {
#[default]
None,
Expand All @@ -48,13 +51,22 @@ pub struct RecordBatchesEncoder {
compress_opts: CompressOptions,
}

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct CompressOptions {
/// The minimum length of the payload to be compressed.
pub compress_min_length: usize,
pub method: CompressionMethod,
}

impl Default for CompressOptions {
fn default() -> Self {
Self {
compress_min_length: DEFAULT_COMPRESS_MIN_LENGTH,
method: CompressionMethod::Zstd,
}
}
}

#[derive(Clone, Default, Debug)]
pub struct CompressOutput {
pub method: CompressionMethod,
Expand Down
14 changes: 10 additions & 4 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::{
task::{Context, Poll},
};

use arrow_ext::{ipc, ipc::CompressionMethod};
use arrow_ext::{
ipc,
ipc::{CompressOptions, CompressionMethod},
};
use ceresdbproto::{
remote_engine::{self, read_response::Output::Arrow, remote_engine_service_client::*},
storage::arrow_payload,
Expand Down Expand Up @@ -42,15 +45,18 @@ struct WriteBatchContext {
pub struct Client {
cached_router: Arc<CachedRouter>,
io_runtime: Arc<Runtime>,
pub compression: CompressOptions,
}

impl Client {
pub fn new(config: Config, router: RouterRef, io_runtime: Arc<Runtime>) -> Self {
let compression = config.compression;
let cached_router = CachedRouter::new(router, config);

Self {
cached_router: Arc::new(cached_router),
io_runtime,
compression,
}
}

Expand Down Expand Up @@ -102,8 +108,7 @@ impl Client {

// Write to remote.
let table_ident = request.table.clone();

let request_pb = ceresdbproto::remote_engine::WriteRequest::try_from(request)
let request_pb = WriteRequest::convert_to_pb(request, self.compression)
.box_err()
.context(Convert {
msg: "Failed to convert WriteRequest to pb",
Expand Down Expand Up @@ -166,9 +171,10 @@ impl Client {
request,
channel,
} = context;
let compress_options = self.compression;
let handle = self.io_runtime.spawn(async move {
let batch_request_pb =
match ceresdbproto::remote_engine::WriteBatchRequest::try_from(request)
match WriteBatchRequest::convert_write_batch_to_pb(request, compress_options)
.box_err()
{
Ok(pb) => pb,
Expand Down
3 changes: 3 additions & 0 deletions remote_engine_client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::str::FromStr;

use arrow_ext::ipc::CompressOptions;
use common_util::config::ReadableDuration;
use serde::{Deserialize, Serialize};

Expand All @@ -18,6 +19,7 @@ pub struct Config {
pub channel_keep_alive_interval: ReadableDuration,
pub route_cache_max_size_per_partition: usize,
pub route_cache_partition_num: usize,
pub compression: CompressOptions,
}

impl Default for Config {
Expand All @@ -31,6 +33,7 @@ impl Default for Config {
channel_keep_alive_while_idle: true,
route_cache_max_size_per_partition: 16,
route_cache_partition_num: 16,
compression: CompressOptions::default(),
}
}
}
6 changes: 5 additions & 1 deletion server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ pub struct ServerConfig {
// Config of route
pub route_cache: router::RouteCacheConfig,

/// record hotspot query or write requests
/// Record hotspot query or write requests
pub hotspot: hotspot::Config,

/// Config of remote engine client
pub remote_client: remote_engine_client::Config,
}

impl Default for ServerConfig {
Expand All @@ -135,6 +138,7 @@ impl Default for ServerConfig {
default_schema_config: Default::default(),
route_cache: router::RouteCacheConfig::default(),
hotspot: hotspot::Config::default(),
remote_client: remote_engine_client::Config::default(),
}
}
}
Expand Down
39 changes: 17 additions & 22 deletions table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ pub enum Error {

define_result!(Error);

const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct TableIdentifier {
pub catalog: String,
Expand Down Expand Up @@ -159,15 +157,16 @@ impl TryFrom<ceresdbproto::remote_engine::WriteBatchRequest> for WriteBatchReque
}
}

impl TryFrom<WriteBatchRequest> for ceresdbproto::remote_engine::WriteBatchRequest {
type Error = Error;

fn try_from(batch_request: WriteBatchRequest) -> std::result::Result<Self, Self::Error> {
impl WriteBatchRequest {
pub fn convert_write_batch_to_pb(
batch_request: WriteBatchRequest,
compress_options: CompressOptions,
) -> std::result::Result<ceresdbproto::remote_engine::WriteBatchRequest, Error> {
let batch = batch_request
.batch
.into_iter()
.map(remote_engine::WriteRequest::try_from)
.collect::<std::result::Result<Vec<_>, Self::Error>>()?;
.map(|req| WriteRequest::convert_to_pb(req, compress_options))
.collect::<std::result::Result<Vec<_>, Error>>()?;

Ok(remote_engine::WriteBatchRequest { batch })
}
Expand Down Expand Up @@ -215,10 +214,11 @@ impl TryFrom<ceresdbproto::remote_engine::WriteRequest> for WriteRequest {
}
}

impl TryFrom<WriteRequest> for ceresdbproto::remote_engine::WriteRequest {
type Error = Error;

fn try_from(request: WriteRequest) -> std::result::Result<Self, Self::Error> {
impl WriteRequest {
pub fn convert_to_pb(
request: WriteRequest,
compress_options: CompressOptions,
) -> std::result::Result<ceresdbproto::remote_engine::WriteRequest, Error> {
// Row group to pb.
let row_group = request.write_request.row_group;
let table_schema = row_group.schema();
Expand All @@ -239,15 +239,10 @@ impl TryFrom<WriteRequest> for ceresdbproto::remote_engine::WriteRequest {
.map_err(|e| Box::new(e) as _)
.context(ConvertRowGroup)?;
let record_batch = record_batch_with_key.into_record_batch();
let compress_output = ipc::encode_record_batch(
&record_batch.into_arrow_record_batch(),
CompressOptions {
compress_min_length: DEFAULT_COMPRESS_MIN_LENGTH,
method: CompressionMethod::Zstd,
},
)
.map_err(|e| Box::new(e) as _)
.context(ConvertRowGroup)?;
let compress_output =
ipc::encode_record_batch(&record_batch.into_arrow_record_batch(), compress_options)
.map_err(|e| Box::new(e) as _)
.context(ConvertRowGroup)?;

let compression = match compress_output.method {
CompressionMethod::None => arrow_payload::Compression::None,
Expand All @@ -266,7 +261,7 @@ impl TryFrom<WriteRequest> for ceresdbproto::remote_engine::WriteRequest {
// Table ident to pb.
let table_pb = request.table.into();

Ok(Self {
Ok(ceresdbproto::remote_engine::WriteRequest {
table: Some(table_pb),
row_group: Some(row_group_pb),
})
Expand Down

0 comments on commit b11574c

Please sign in to comment.