From 62b2c94973c21de0113b13b49a99fac457c89e49 Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Fri, 21 Apr 2023 13:53:15 +0800 Subject: [PATCH 1/4] feat: support domain name as the ceresdb node addr (#852) * feat: support domain name as the ceresdb node addr * chore: remove unused imports --- src/bin/ceresdb-server.rs | 20 +++++++------------- src/config.rs | 2 ++ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/bin/ceresdb-server.rs b/src/bin/ceresdb-server.rs index ed4a54d123..373ed395b9 100644 --- a/src/bin/ceresdb-server.rs +++ b/src/bin/ceresdb-server.rs @@ -1,8 +1,8 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! The main entry point to start the server -use std::{env, net::IpAddr}; +use std::env; use ceresdb::{ config::{ClusterDeployment, Config}, @@ -12,8 +12,11 @@ use clap::{App, Arg}; use common_util::{panic, toml}; use log::info; -/// The ip address of current node. +/// By this environment variable, the address of current node can be overridden. +/// And it could be domain name or ip address, but no port follows it. const NODE_ADDR: &str = "CERESDB_SERVER_ADDR"; +/// By this environment variable, the cluster name of current node can be +/// overridden. const CLUSTER_NAME: &str = "CLUSTER_NAME"; fn fetch_version() -> String { @@ -36,14 +39,6 @@ fn fetch_version() -> String { .join("\n") } -// Parse the raw addr and panic if it is invalid. -fn parse_node_addr_or_fail(raw_addr: &str) -> IpAddr { - let socket_addr: IpAddr = raw_addr - .parse() - .unwrap_or_else(|_| panic!("invalid node addr, raw_addr:{raw_addr}")); - socket_addr -} - fn main() { let version = fetch_version(); let matches = App::new("CeresDB Server") @@ -67,8 +62,7 @@ fn main() { }; if let Ok(node_addr) = env::var(NODE_ADDR) { - let ip = parse_node_addr_or_fail(&node_addr); - config.node.addr = ip.to_string(); + config.node.addr = node_addr; } if let Ok(cluster) = env::var(CLUSTER_NAME) { if let Some(ClusterDeployment::WithMeta(v)) = &mut config.cluster_deployment { diff --git a/src/config.rs b/src/config.rs index 2de2eecafa..2f5d8d6252 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,8 @@ use server::{ #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] pub struct NodeInfo { + /// The address of the ceresdb node. It can be a domain name or an IP + /// address without port followed. pub addr: String, pub zone: String, pub idc: String, From 406e1564f439938a8a5aab46dc24e13028b0d78a Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Fri, 21 Apr 2023 15:15:58 +0800 Subject: [PATCH 2/4] refactor: implement the distributed lock of shard (#706) * refactor: add etcd shard lock * feat: upgrade etcd-client to v0.10.4 and enable tls feature * refactor: new implementation of the shard lock * refactor: clippy compliants * chore: use latest ceresdbproto * chore: fix format error * feat: operate shard lock in meta event server * feat: enable cluster integration tests * feat: use ceresdbproto in crates.io * feat: use ceresdbproto in crates.io * feat: add some basic unit tests * refactor: rename and comment * chore: check the root path and cluster name --------- Co-authored-by: xikai.wxk --- Cargo.lock | 23 +- Cargo.toml | 4 +- Makefile | 3 +- cluster/Cargo.toml | 2 + cluster/src/cluster_impl.rs | 118 +++-- cluster/src/config.rs | 48 +- cluster/src/lib.rs | 65 +-- cluster/src/shard_lock_manager.rs | 505 ++++++++++++++++++++++ docs/example-cluster-0.toml | 4 +- docs/example-cluster-1.toml | 4 +- meta_client/src/types.rs | 10 +- router/src/cluster_based.rs | 15 +- server/src/grpc/meta_event_service/mod.rs | 102 ++++- src/setup.rs | 8 +- 14 files changed, 821 insertions(+), 90 deletions(-) create mode 100644 cluster/src/shard_lock_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 0810a1a0d3..8fe388d9fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1115,6 +1115,7 @@ dependencies = [ "cluster", "common_util", "df_operator", + "etcd-client", "interpreters", "log", "logger", @@ -1167,9 +1168,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76e0e168ea5b6a7dcef8e0e067d259256daea839fa8d083f132f2b2d8dfb1e69" +checksum = "456d1f090caba236219310b8f921a2b76e06e65f848965f1cf890a982b0e4735" dependencies = [ "prost", "protoc-bin-vendored", @@ -1334,8 +1335,10 @@ dependencies = [ "ceresdbproto", "common_types", "common_util", + "etcd-client", "log", "meta_client", + "prost", "serde", "serde_json", "snafu 0.6.10", @@ -2188,6 +2191,22 @@ dependencies = [ "version_check", ] +[[package]] +name = "etcd-client" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58" +dependencies = [ + "http", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", + "tower-service", +] + [[package]] name = "event-listener" version = "2.5.3" diff --git a/Cargo.toml b/Cargo.toml index 3240e8ee4c..621b5a5cbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ bytes = "1.1.0" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.2" +ceresdbproto = "1.0.3" chrono = "0.4" clap = "3.0" clru = "0.6.1" @@ -78,6 +78,7 @@ common_util = { path = "common_util" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" } datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" } df_operator = { path = "df_operator" } +etcd-client = "0.10.3" env_logger = "0.6" futures = "0.3" xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", features = [ @@ -137,6 +138,7 @@ clap = { workspace = true } cluster = { workspace = true } common_util = { workspace = true } df_operator = { workspace = true } +etcd-client = { workspace = true } interpreters = { workspace = true } log = { workspace = true } logger = { workspace = true } diff --git a/Makefile b/Makefile index 83e3eb34ed..ce68aa73de 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,7 @@ test: cd $(DIR); cargo test --workspace -- --test-threads=4 integration-test: - # TODO: restore it as `make run` after we fix the clustering integration test. - cd $(DIR)/integration_tests; make run-local + cd $(DIR)/integration_tests; make run # grcov needs build first, then run test build-ut: diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 67023a763e..2acbf9ab5d 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -15,8 +15,10 @@ async-trait = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } +etcd-client = { workspace = true } log = { workspace = true } meta_client = { workspace = true } +prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 9008aceb7f..76053a49de 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -8,15 +8,17 @@ use std::{ use async_trait::async_trait; use ceresdbproto::{ meta_event::{ - CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, - DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, UpdateShardInfo, + CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, + OpenTableOnShardRequest, UpdateShardInfo, }, meta_service::TableInfo as TableInfoPb, }; +use common_types::table::ShardId; use common_util::{ error::BoxError, runtime::{JoinHandle, Runtime}, }; +use etcd_client::ConnectOptions; use log::{error, info, warn}; use meta_client::{ types::{ @@ -32,9 +34,13 @@ use tokio::{ }; use crate::{ - config::ClusterConfig, shard_tables_cache::ShardTablesCache, topology::ClusterTopology, - Cluster, ClusterNodesNotFound, ClusterNodesResp, Internal, MetaClientFailure, OpenShard, - OpenShardWithCause, Result, ShardNotFound, TableNotFound, + config::ClusterConfig, + shard_lock_manager::{ShardLockManager, ShardLockManagerRef}, + shard_tables_cache::ShardTablesCache, + topology::ClusterTopology, + Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, Internal, + InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound, + TableNotFound, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -49,23 +55,43 @@ pub struct ClusterImpl { config: ClusterConfig, heartbeat_handle: Mutex>>, stop_heartbeat_tx: Mutex>>, + shard_lock_manager: ShardLockManagerRef, } impl ClusterImpl { - pub fn new( + pub async fn create( + node_name: String, shard_tables_cache: ShardTablesCache, meta_client: MetaClientRef, config: ClusterConfig, runtime: Arc, ) -> Result { - let inner = Inner::new(shard_tables_cache, meta_client)?; - + let inner = Arc::new(Inner::new(shard_tables_cache, meta_client)?); + let connect_options = ConnectOptions::from(&config.etcd_client); + let etcd_client = + etcd_client::Client::connect(&config.etcd_client.server_addrs, Some(connect_options)) + .await + .context(EtcdClientFailureWithCause { + msg: "failed to connect to etcd", + })?; + + let shard_lock_key_prefix = Self::shard_lock_key_prefix( + &config.etcd_client.root_path, + &config.meta_client.cluster_name, + )?; + let shard_lock_manager = ShardLockManager::new( + shard_lock_key_prefix, + node_name, + etcd_client, + runtime.clone(), + ); Ok(Self { - inner: Arc::new(inner), + inner, runtime, config, heartbeat_handle: Mutex::new(None), stop_heartbeat_tx: Mutex::new(None), + shard_lock_manager: Arc::new(shard_lock_manager), }) } @@ -109,8 +135,23 @@ impl ClusterImpl { self.config.meta_client.lease.0 / 2 } - pub fn shard_tables_cache(&self) -> &ShardTablesCache { - &self.inner.shard_tables_cache + fn shard_lock_key_prefix(root_path: &str, cluster_name: &str) -> Result { + ensure!( + root_path.starts_with('/'), + InvalidArguments { + msg: "root_path is required to start with /", + } + ); + + ensure!( + !cluster_name.is_empty(), + InvalidArguments { + msg: "cluster_name is required non-empty", + } + ); + + const SHARD_LOCK_KEY: &str = "shards"; + Ok(format!("{root_path}/{cluster_name}/{SHARD_LOCK_KEY}")) } } @@ -187,12 +228,7 @@ impl Inner { Ok(resp) } - async fn open_shard(&self, req: &OpenShardRequest) -> Result { - let shard_info = req.shard.as_ref().context(OpenShard { - shard_id: 0u32, - msg: "missing shard info in the request", - })?; - + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { if let Some(tables_of_shard) = self.shard_tables_cache.get(shard_info.id) { if tables_of_shard.shard_info.version == shard_info.version { info!( @@ -245,11 +281,11 @@ impl Inner { Ok(tables_of_shard) } - fn close_shard(&self, req: &CloseShardRequest) -> Result { + fn close_shard(&self, shard_id: ShardId) -> Result { self.shard_tables_cache - .remove(req.shard_id) + .remove(shard_id) .with_context(|| ShardNotFound { - msg: format!("close non-existent shard, shard_id:{}", req.shard_id), + msg: format!("close non-existent shard, shard_id:{shard_id}"), }) } @@ -286,7 +322,7 @@ impl Inner { self.shard_tables_cache.try_insert_table_to_shard( update_shard_info.prev_version, - ShardInfo::from(curr_shard_info), + ShardInfo::from(&curr_shard_info), TableInfo::try_from(table_info) .box_err() .context(Internal { @@ -312,7 +348,7 @@ impl Inner { self.shard_tables_cache.try_remove_table_from_shard( update_shard_info.prev_version, - ShardInfo::from(curr_shard_info), + ShardInfo::from(&curr_shard_info), TableInfo::try_from(table_info) .box_err() .context(Internal { @@ -355,12 +391,12 @@ impl Cluster for ClusterImpl { Ok(()) } - async fn open_shard(&self, req: &OpenShardRequest) -> Result { - self.inner.open_shard(req).await + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { + self.inner.open_shard(shard_info).await } - async fn close_shard(&self, req: &CloseShardRequest) -> Result { - self.inner.close_shard(req) + async fn close_shard(&self, shard_id: ShardId) -> Result { + self.inner.close_shard(shard_id) } async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> { @@ -386,4 +422,34 @@ impl Cluster for ClusterImpl { async fn fetch_nodes(&self) -> Result { self.inner.fetch_nodes().await } + + fn shard_lock_manager(&self) -> ShardLockManagerRef { + self.shard_lock_manager.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_shard_lock_key_prefix() { + let cases = vec![ + ( + ("/ceresdb", "defaultCluster"), + Some("/ceresdb/defaultCluster/shards"), + ), + (("", "defaultCluster"), None), + (("vvv", "defaultCluster"), None), + (("/x", ""), None), + ]; + + for ((root_path, cluster_name), expected) in cases { + let actual = ClusterImpl::shard_lock_key_prefix(root_path, cluster_name); + match expected { + Some(expected) => assert_eq!(actual.unwrap(), expected), + None => assert!(actual.is_err()), + } + } + } } diff --git a/cluster/src/config.rs b/cluster/src/config.rs index 4e87d735b7..c58d1ed6bc 100644 --- a/cluster/src/config.rs +++ b/cluster/src/config.rs @@ -1,6 +1,8 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use common_types::schema::TIMESTAMP_COLUMN; +use common_util::config::ReadableDuration; +use etcd_client::ConnectOptions; use meta_client::meta_impl::MetaClientConfig; use serde::{Deserialize, Serialize}; use table_engine::ANALYTIC_ENGINE_TYPE; @@ -22,9 +24,53 @@ impl Default for SchemaConfig { } } +const DEFAULT_ETCD_ROOT_PATH: &str = "/ceresdb"; + +#[derive(Clone, Deserialize, Debug, Serialize)] +#[serde(default)] +pub struct EtcdClientConfig { + /// The etcd server addresses + pub server_addrs: Vec, + /// Root path in the etcd used by the ceresdb server + pub root_path: String, + + pub connect_timeout: ReadableDuration, + pub rpc_timeout: ReadableDuration, + + /// The lease of the shard lock in seconds. + /// + /// It should be greater than `shard_lock_lease_check_interval`. + pub shard_lock_lease_ttl_sec: u64, + /// The interval of checking whether the shard lock lease is expired + pub shard_lock_lease_check_interval: ReadableDuration, +} + +impl From<&EtcdClientConfig> for ConnectOptions { + fn from(config: &EtcdClientConfig) -> Self { + ConnectOptions::default() + .with_connect_timeout(config.connect_timeout.0) + .with_timeout(config.rpc_timeout.0) + } +} + +impl Default for EtcdClientConfig { + fn default() -> Self { + Self { + server_addrs: vec!["127.0.0.1:2379".to_string()], + root_path: DEFAULT_ETCD_ROOT_PATH.to_string(), + + rpc_timeout: ReadableDuration::secs(5), + connect_timeout: ReadableDuration::secs(5), + shard_lock_lease_ttl_sec: 15, + shard_lock_lease_check_interval: ReadableDuration::millis(200), + } + } +} + #[derive(Default, Clone, Deserialize, Debug, Serialize)] #[serde(default)] pub struct ClusterConfig { pub cmd_channel_buffer_size: usize, pub meta_client: MetaClientConfig, + pub etcd_client: EtcdClientConfig, } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 5aa2ed09ff..d0b2574fa7 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -8,12 +8,14 @@ //! //! The core types are [Cluster] trait and its implementation [ClusterImpl]. +#![feature(trait_alias)] + use std::sync::Arc; use async_trait::async_trait; use ceresdbproto::meta_event::{ - CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, - DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, + CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, + OpenTableOnShardRequest, }; use common_types::schema::SchemaName; use common_util::{define_result, error::GenericError}; @@ -21,36 +23,43 @@ use meta_client::types::{ ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, ShardInfo, ShardVersion, TablesOfShard, }; +use shard_lock_manager::ShardLockManagerRef; use snafu::{Backtrace, Snafu}; pub mod cluster_impl; pub mod config; +pub mod shard_lock_manager; pub mod shard_tables_cache; -// FIXME: Remove this lint ignore derive when topology about schema tables is -// finished. #[allow(dead_code)] pub mod topology; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { - #[snafu(display("{msg}, err:{source}"))] + #[snafu(display("Invalid arguments, msg:{msg}.\nBacktrace:{backtrace}"))] + InvalidArguments { msg: String, backtrace: Backtrace }, + + #[snafu(display("Internal error, msg:{msg}, err:{source}"))] Internal { msg: String, source: GenericError }, - #[snafu(display("Build meta client failed, err:{}.", source))] + #[snafu(display("Build meta client failed, err:{source}."))] BuildMetaClient { source: meta_client::Error }, - #[snafu(display("Meta client start failed, err:{}.", source))] + #[snafu(display("Meta client start failed, err:{source}."))] StartMetaClient { source: meta_client::Error }, - #[snafu(display("Meta client execute failed, err:{}.", source))] + #[snafu(display("Meta client execute failed, err:{source}."))] MetaClientFailure { source: meta_client::Error }, + #[snafu(display("Etcd client failure, msg:{msg}, err:{source}.\nBacktrace:\n{backtrace}"))] + EtcdClientFailureWithCause { + msg: String, + source: etcd_client::Error, + backtrace: Backtrace, + }, + #[snafu(display( - "Fail to open shard, shard_id:{}, msg:{}.\nBacktrace:\n{}", - shard_id, - msg, - backtrace + "Fail to open shard, shard_id:{shard_id}, msg:{msg}.\nBacktrace:\n{backtrace}", ))] OpenShard { shard_id: ShardId, @@ -58,25 +67,29 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Fail to open shard, source:{}.", source))] + #[snafu(display("Fail to open shard, shard_id:{shard_id}, source:{source}."))] OpenShardWithCause { shard_id: ShardId, source: GenericError, }, - #[snafu(display("Shard not found, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Fail to close shard, shard_id:{shard_id}, source:{source}."))] + CloseShardWithCause { + shard_id: ShardId, + source: GenericError, + }, + + #[snafu(display("Shard not found, msg:{msg}.\nBacktrace:\n{backtrace}"))] ShardNotFound { msg: String, backtrace: Backtrace }, - #[snafu(display("Table not found, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Table not found, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableNotFound { msg: String, backtrace: Backtrace }, - #[snafu(display("Table already exists, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Table already exists, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableAlreadyExists { msg: String, backtrace: Backtrace }, #[snafu(display( - "Schema not found in current node, schema name:{}.\nBacktrace:\n{}", - schema_name, - backtrace + "Schema not found in current node, schema name:{schema_name}.\nBacktrace:\n{backtrace}", ))] SchemaNotFound { schema_name: SchemaName, @@ -84,10 +97,7 @@ pub enum Error { }, #[snafu(display( - "Shard version mismatch, shard_info:{:?}, expect version:{}.\nBacktrace:\n{}", - shard_info, - expect_version, - backtrace + "Shard version mismatch, shard_info:{shard_info:?}, expect version:{expect_version}.\nBacktrace:\n{backtrace}", ))] ShardVersionMismatch { shard_info: ShardInfo, @@ -96,9 +106,7 @@ pub enum Error { }, #[snafu(display( - "Cluster nodes are not found in the topology, version:{}.\nBacktrace:\n{}", - version, - backtrace + "Cluster nodes are not found in the topology, version:{version}.\nBacktrace:\n{backtrace}", ))] ClusterNodesNotFound { version: u64, backtrace: Backtrace }, } @@ -118,12 +126,13 @@ pub struct ClusterNodesResp { pub trait Cluster { async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; - async fn open_shard(&self, req: &OpenShardRequest) -> Result; - async fn close_shard(&self, req: &CloseShardRequest) -> Result; + async fn open_shard(&self, shard_info: &ShardInfo) -> Result; + async fn close_shard(&self, req: ShardId) -> Result; async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>; async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>; async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()>; async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()>; async fn route_tables(&self, req: &RouteTablesRequest) -> Result; async fn fetch_nodes(&self) -> Result; + fn shard_lock_manager(&self) -> ShardLockManagerRef; } diff --git a/cluster/src/shard_lock_manager.rs b/cluster/src/shard_lock_manager.rs new file mode 100644 index 0000000000..adf2ffac57 --- /dev/null +++ b/cluster/src/shard_lock_manager.rs @@ -0,0 +1,505 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{ + collections::HashMap, + future::Future, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use ceresdbproto::meta_event::ShardLockValue; +use common_types::{bytes::Bytes, table::ShardId}; +use common_util::{ + define_result, + runtime::{JoinHandle, RuntimeRef}, +}; +use etcd_client::{ + Client, Compare, CompareOp, LeaseKeepAliveStream, LeaseKeeper, PutOptions, Txn, TxnOp, +}; +use log::{debug, error, info, warn}; +use prost::Message; +use snafu::{ensure, Backtrace, ResultExt, Snafu}; +use tokio::sync::{oneshot, RwLock}; + +#[derive(Debug, Snafu)] +#[snafu(visibility = "pub")] +pub enum Error { + #[snafu(display("Failed to keep alive, err:{source}.\nBacktrace:\n{backtrace:?}"))] + KeepAlive { + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to keep alive because of no resp, lease_id:{lease_id}.\nBacktrace:\n{backtrace:?}" + ))] + KeepAliveWithoutResp { lease_id: i64, backtrace: Backtrace }, + + #[snafu(display( + "Failed to grant lease, shard_id:{shard_id}, err:{source}.\nBacktrace:\n{backtrace:?}" + ))] + GrantLease { + shard_id: ShardId, + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to create lock in etcd, shard_id:{shard_id}, err:{source}.\nBacktrace:\n{backtrace:?}"))] + CreateLockInEtcd { + shard_id: ShardId, + source: etcd_client::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to execute txn of create lock, shard_id:{shard_id}.\nBacktrace:\n{backtrace:?}" + ))] + CreateLockTxn { + shard_id: ShardId, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to revoke the lease, lease_id:{lease_id}, shard_id:{shard_id}, err:{source}.\nBacktrace:\n{backtrace:?}"))] + RevokeLease { + lease_id: i64, + shard_id: ShardId, + source: etcd_client::Error, + backtrace: Backtrace, + }, +} + +define_result!(Error); + +const DEFAULT_LEASE_SECS: u64 = 30; +const LOCK_EXPIRE_CHECK_INTERVAL: Duration = Duration::from_millis(200); + +pub type ShardLockManagerRef = Arc; + +/// Shard lock manager is implemented based on etcd. +/// +/// Only with the lock held, the shard can be operated by this node. +pub struct ShardLockManager { + key_prefix: String, + value: Bytes, + lease_ttl_sec: u64, + + etcd_client: Client, + runtime: RuntimeRef, + + // ShardID -> ShardLock + shard_locks: Arc>>, +} + +/// The lease of the shard lock. +struct Lease { + /// The lease id + id: i64, + /// The time to live of the lease + ttl: Duration, + /// Expired time in milliseconds + expired_at: Arc, + + keep_alive_handle: Option>, +} + +impl Lease { + fn new(id: i64, ttl: Duration) -> Self { + Self { + id, + ttl, + expired_at: Arc::new(AtomicU64::new(0)), + keep_alive_handle: None, + } + } + + /// Check whether lease is expired. + fn is_expired(&self) -> bool { + let now = common_util::time::current_time_millis(); + let expired_at_ms = self.expired_at.load(Ordering::Relaxed); + expired_at_ms <= now + } + + /// Keep alive the lease once. + /// + /// Return a new ttl in milliseconds if succeeds. + async fn keep_alive_once( + keeper: &mut LeaseKeeper, + stream: &mut LeaseKeepAliveStream, + ) -> Result { + keeper.keep_alive().await.context(KeepAlive)?; + match stream.message().await.context(KeepAlive)? { + Some(resp) => { + // The ttl in the response is in seconds, let's convert it into milliseconds. + let new_ttl = resp.ttl() as u64 * 1000; + let expired_at = common_util::time::current_time_millis() + new_ttl; + Ok(expired_at) + } + None => { + error!( + "failed to keep lease alive because of no resp, id:{}", + keeper.id() + ); + KeepAliveWithoutResp { + lease_id: keeper.id(), + } + .fail() + } + } + } + + async fn start_keepalive( + &mut self, + mut stop_receiver: oneshot::Receiver<()>, + notifier: oneshot::Sender>, + etcd_client: &mut Client, + runtime: &RuntimeRef, + ) -> Result<()> { + let (mut keeper, mut stream) = etcd_client + .lease_keep_alive(self.id) + .await + .context(KeepAlive)?; + let new_expired_at = Self::keep_alive_once(&mut keeper, &mut stream).await?; + self.expired_at.store(new_expired_at, Ordering::Relaxed); + + // send keepalive request every ttl/3. + let keep_alive_interval = self.ttl / 3; + let lease_id = self.id; + let expired_at = self.expired_at.clone(); + let handle = runtime.spawn(async move { + loop { + match Self::keep_alive_once(&mut keeper, &mut stream).await { + Ok(new_expired_at) => { + debug!("The lease {lease_id} has been kept alive, new expire time in milliseconds:{new_expired_at}"); + expired_at.store(new_expired_at, Ordering::Relaxed); + } + Err(e) => { + error!("Failed to keep lease alive, id:{lease_id}, err:{e}"); + if notifier.send(Err(e)).is_err() { + error!("failed to send keepalive failure, lease_id:{lease_id}"); + } + + return + } + } + let sleeper = tokio::time::sleep(keep_alive_interval); + tokio::select! { + _ = sleeper => { + debug!("Try to keep the lease alive again, id:{lease_id}"); + }, + _ = &mut stop_receiver => { + debug!("Stop keeping lease alive, id:{lease_id}"); + if notifier.send(Ok(())).is_err() { + error!("failed to send keepalive stopping message, lease_id:{lease_id}"); + } + return + } + } + } + }); + + self.keep_alive_handle = Some(handle); + + Ok(()) + } +} + +/// Lock for a shard. +/// +/// The lock is a temporary key in etcd, which is created with a lease. +pub struct ShardLock { + shard_id: ShardId, + /// The temporary key in etcd + key: Bytes, + /// The value of the key in etcd + value: Bytes, + /// The lease of the lock in etcd + ttl_sec: u64, + + lease_id: Option, + lease_check_handle: Option>, + lease_keepalive_stopper: Option>, +} + +impl ShardLock { + fn new(shard_id: ShardId, key_prefix: &str, value: Bytes, ttl_sec: u64) -> Self { + Self { + shard_id, + key: Self::lock_key(key_prefix, shard_id), + value, + ttl_sec, + + lease_id: None, + lease_check_handle: None, + lease_keepalive_stopper: None, + } + } + + fn lock_key(key_prefix: &str, shard_id: ShardId) -> Bytes { + let key = format!("{key_prefix}/{shard_id:0>20}"); + Bytes::from(key) + } + + /// Grant the shard lock. + /// + /// The `on_lock_expired` callback will be called when the lock is expired, + /// but it won't be triggered if the lock is revoked in purpose. + async fn grant( + &mut self, + on_lock_expired: OnExpired, + etcd_client: &mut Client, + runtime: &RuntimeRef, + ) -> Result<()> + where + OnExpired: FnOnce(ShardId) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + // Grant the lease. + let resp = etcd_client + .lease_grant(self.ttl_sec as i64, None) + .await + .context(GrantLease { + shard_id: self.shard_id, + })?; + let lease_id = resp.id(); + + self.acquire_lock_with_lease(lease_id, etcd_client).await?; + + self.keep_lease_alive(lease_id, on_lock_expired, etcd_client, runtime) + .await?; + + Ok(()) + } + + /// Revoke the shard lock. + /// + /// NOTE: the `on_lock_expired` callback set in the `grant` won't be + /// triggered. + async fn revoke(&mut self, etcd_client: &mut Client) -> Result<()> { + // Stop keeping alive the lease. + if let Some(sender) = self.lease_keepalive_stopper.take() { + if sender.send(()).is_err() { + warn!("Failed to stop keeping lease alive, maybe it has been stopped already so ignore it, hard_id:{}", self.shard_id); + } + } + + // Wait for the lease check worker to stop. + if let Some(handle) = self.lease_check_handle.take() { + if let Err(e) = handle.await { + warn!("Failed to wait for the lease check worker to stop, maybe it has exited so ignore it, shard_id:{}, err:{e}", self.shard_id); + } + } + + // Revoke the lease. + if let Some(lease_id) = self.lease_id.take() { + etcd_client + .lease_revoke(lease_id) + .await + .context(RevokeLease { + lease_id, + shard_id: self.shard_id, + })?; + } + + Ok(()) + } + + async fn acquire_lock_with_lease(&self, lease_id: i64, etcd_client: &mut Client) -> Result<()> { + // In etcd, the version is 0 if the key does not exist. + let not_exist = Compare::version(self.key.clone(), CompareOp::Equal, 0); + let create_key = { + let options = PutOptions::new().with_lease(lease_id); + TxnOp::put(self.key.clone(), self.value.clone(), Some(options)) + }; + + let create_if_not_exist = Txn::new().when([not_exist]).and_then([create_key]); + + let resp = etcd_client + .txn(create_if_not_exist) + .await + .context(CreateLockInEtcd { + shard_id: self.shard_id, + })?; + + ensure!( + resp.succeeded(), + CreateLockTxn { + shard_id: self.shard_id + } + ); + + Ok(()) + } + + async fn keep_lease_alive( + &mut self, + lease_id: i64, + on_lock_expired: OnExpired, + etcd_client: &mut Client, + runtime: &RuntimeRef, + ) -> Result<()> + where + OnExpired: FnOnce(ShardId) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + let mut lease = Lease::new(lease_id, Duration::from_secs(self.ttl_sec)); + let (lease_expire_notifier, mut lease_expire_receiver) = oneshot::channel(); + let (keepalive_stop_sender, keepalive_stop_receiver) = oneshot::channel(); + lease + .start_keepalive( + keepalive_stop_receiver, + lease_expire_notifier, + etcd_client, + runtime, + ) + .await?; + + let lock_expire_check_interval = LOCK_EXPIRE_CHECK_INTERVAL; + let shard_id = self.shard_id; + let handle = runtime.spawn(async move { + loop { + let timer = tokio::time::sleep(lock_expire_check_interval); + tokio::select! { + _ = timer => { + if lease.is_expired() { + warn!("The lease of the shard lock is expired, shard_id:{shard_id}"); + on_lock_expired(shard_id); + return + } + } + res = &mut lease_expire_receiver => { + match res { + Ok(_) => { + info!("The lease is revoked in purpose, and no need to do anything, shard_id:{shard_id}"); + } + Err(e) => { + error!("The lease of the shard lock is expired, shard_id:{shard_id}, err:{e:?}"); + on_lock_expired(shard_id); + } + } + return + } + } + } + }); + + self.lease_check_handle = Some(handle); + self.lease_keepalive_stopper = Some(keepalive_stop_sender); + self.lease_id = Some(lease_id); + Ok(()) + } +} + +impl ShardLockManager { + pub fn new( + key_prefix: String, + node_name: String, + etcd_client: Client, + runtime: RuntimeRef, + ) -> ShardLockManager { + let value = Bytes::from(ShardLockValue { node_name }.encode_to_vec()); + + ShardLockManager { + key_prefix, + value, + lease_ttl_sec: DEFAULT_LEASE_SECS, + etcd_client, + runtime, + shard_locks: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Grant lock to the shard. + /// + /// If the lock is already granted, return false. The `on_lock_expired` will + /// be called when the lock lease is expired, but it won't be triggered if + /// the lock is revoked. + pub async fn grant_lock( + &self, + shard_id: u32, + on_lock_expired: OnExpired, + ) -> Result + where + OnExpired: FnOnce(ShardId) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + info!("Try to grant lock for shard:{shard_id}"); + { + let shard_locks = self.shard_locks.read().await; + if shard_locks.contains_key(&shard_id) { + warn!("The lock has been granted, shard_id:{shard_id}"); + return Ok(false); + } + } + + // Double check whether the locks has been granted. + let mut shard_locks = self.shard_locks.write().await; + if shard_locks.contains_key(&shard_id) { + warn!("The lock has been granted, shard_id:{shard_id}"); + return Ok(false); + } + + let mut shard_lock = ShardLock::new( + shard_id, + &self.key_prefix, + self.value.clone(), + self.lease_ttl_sec, + ); + let mut etcd_client = self.etcd_client.clone(); + shard_lock + .grant(on_lock_expired, &mut etcd_client, &self.runtime) + .await?; + + shard_locks.insert(shard_id, shard_lock); + + info!("Finish granting lock for shard:{shard_id}"); + Ok(true) + } + + /// Revoke the shard lock. + /// + /// If the lock is not exist, return false. And the `on_lock_expired` won't + /// be triggered. + pub async fn revoke_lock(&self, shard_id: u32) -> Result { + info!("Try to revoke lock for shard:{shard_id}"); + + let mut shard_locks = self.shard_locks.write().await; + let shard_lock = shard_locks.remove(&shard_id); + match shard_lock { + Some(mut v) => { + let mut etcd_client = self.etcd_client.clone(); + v.revoke(&mut etcd_client).await?; + + info!("Finish revoking lock for shard:{shard_id}"); + Ok(true) + } + None => { + warn!("The lock is not exist, shard_id:{shard_id}"); + Ok(false) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_shard_lock_key() { + let key_prefix = "/ceresdb/defaultCluster"; + let cases = vec![ + (0, "/ceresdb/defaultCluster/00000000000000000000"), + (10, "/ceresdb/defaultCluster/00000000000000000010"), + (10000, "/ceresdb/defaultCluster/00000000000000010000"), + (999999999, "/ceresdb/defaultCluster/00000000000999999999"), + ]; + + for (shard_id, expected) in cases { + let key = ShardLock::lock_key(key_prefix, shard_id); + assert_eq!(key, expected); + } + } +} diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml index da3fbe7631..8e4cae3511 100644 --- a/docs/example-cluster-0.toml +++ b/docs/example-cluster-0.toml @@ -1,5 +1,5 @@ [node] -addr = "ceresdb0" +addr = "127.0.0.1" [logger] level = "info" @@ -12,7 +12,7 @@ mysql_port = 3307 deploy_mode = "Cluster" [server.forward] -enable = true +enable = false [analytic.storage] mem_cache_capacity = '1G' diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml index 6a04569cbd..bedb6ef7f1 100644 --- a/docs/example-cluster-1.toml +++ b/docs/example-cluster-1.toml @@ -1,5 +1,5 @@ [node] -addr = "ceresdb1" +addr = "127.0.0.1" [logger] level = "info" @@ -12,7 +12,7 @@ mysql_port = 13307 deploy_mode = "Cluster" [server.forward] -enable = true +enable = false [analytic.storage] mem_cache_capacity = '1G' diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index c5dfc95ed4..8e1dc76278 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -218,8 +218,8 @@ impl From for meta_service_pb::ShardInfo { } } -impl From for ShardInfo { - fn from(pb_shard_info: meta_service_pb::ShardInfo) -> Self { +impl From<&meta_service_pb::ShardInfo> for ShardInfo { + fn from(pb_shard_info: &meta_service_pb::ShardInfo) -> Self { ShardInfo { id: pb_shard_info.id, role: pb_shard_info.role().into(), @@ -283,7 +283,7 @@ impl TryFrom for TablesOfShard { msg: "in meta_service_pb::TablesOfShard", })?; Ok(Self { - shard_info: ShardInfo::from(shard_info), + shard_info: ShardInfo::from(&shard_info), tables: pb_tables_of_shard .tables .into_iter() @@ -354,7 +354,7 @@ impl TryFrom for CreateTableResponse { Ok(Self { created_table: TableInfo::try_from(pb_table_info)?, - shard_info: ShardInfo::from(pb_shard_info), + shard_info: ShardInfo::from(&pb_shard_info), }) } } @@ -437,7 +437,7 @@ impl TryFrom for NodeShard { })?; Ok(NodeShard { endpoint: pb.endpoint, - shard_info: ShardInfo::from(pb_shard_info), + shard_info: ShardInfo::from(&pb_shard_info), }) } } diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index 149b2671f0..0aa179f54e 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -179,12 +179,13 @@ mod tests { use ceresdbproto::{ meta_event::{ - CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, - DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, + CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, + OpenTableOnShardRequest, }, storage::RequestContext, }; - use cluster::{Cluster, ClusterNodesResp}; + use cluster::{shard_lock_manager::ShardLockManagerRef, Cluster, ClusterNodesResp}; + use common_types::table::ShardId; use common_util::config::ReadableDuration; use meta_client::types::{ NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, ShardRole::Leader, TableInfo, @@ -205,11 +206,11 @@ mod tests { unimplemented!(); } - async fn open_shard(&self, _req: &OpenShardRequest) -> cluster::Result { + async fn open_shard(&self, _: &ShardInfo) -> cluster::Result { unimplemented!(); } - async fn close_shard(&self, _req: &CloseShardRequest) -> cluster::Result { + async fn close_shard(&self, _: ShardId) -> cluster::Result { unimplemented!(); } @@ -272,6 +273,10 @@ mod tests { async fn fetch_nodes(&self) -> cluster::Result { unimplemented!(); } + + fn shard_lock_manager(&self) -> ShardLockManagerRef { + unimplemented!(); + } } #[tokio::test] diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index fb15127622..7b011fe5ab 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -22,9 +22,10 @@ use ceresdbproto::meta_event::{ SplitShardResponse, }; use cluster::ClusterRef; -use common_types::schema::SchemaEncoder; +use common_types::{schema::SchemaEncoder, table::ShardId}; use common_util::{error::BoxError, runtime::Runtime, time::InstantExt}; -use log::{error, info}; +use log::{error, info, warn}; +use meta_client::types::ShardInfo; use paste::paste; use query_engine::executor::Executor as QueryExecutor; use snafu::{OptionExt, ResultExt}; @@ -183,6 +184,7 @@ impl MetaServiceImpl { } /// Context for handling all kinds of meta event service. +#[derive(Clone)] struct HandlerContext { cluster: ClusterRef, default_catalog: String, @@ -192,21 +194,79 @@ struct HandlerContext { wal_region_closer: WalRegionCloserRef, } -// TODO: maybe we should encapsulate the logic of handling meta event into a -// trait, so that we don't need to expose the logic to the meta event service -// implementation. +impl HandlerContext { + async fn acquire_shard_lock(&self, shard_id: ShardId) -> Result<()> { + let lock_mgr = self.cluster.shard_lock_manager(); + let new_ctx = self.clone(); + let on_lock_expired = |shard_id| async move { + warn!("Shard lock is released, try to close the tables and shard, shard_id:{shard_id}"); + let close_shard_req = CloseShardRequest { shard_id }; + let res = handle_close_shard(new_ctx, close_shard_req).await; + match res { + Ok(_) => info!("Close shard success, shard_id:{shard_id}"), + Err(e) => error!("Close shard failed, shard_id:{shard_id}, err:{e}"), + } + }; -async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> { - let tables_of_shard = - ctx.cluster - .open_shard(&request) + let granted_by_this_call = lock_mgr + .grant_lock(shard_id, on_lock_expired) .await .box_err() .context(ErrWithCause { code: StatusCode::Internal, - msg: "fail to open shards in cluster", + msg: "fail to acquire shard lock", })?; + if !granted_by_this_call { + warn!("Shard lock is already granted, shard_id:{}", shard_id); + } + + Ok(()) + } + + async fn release_shard_lock(&self, shard_id: ShardId) -> Result<()> { + let lock_mgr = self.cluster.shard_lock_manager(); + let revoked_by_this_call = + lock_mgr + .revoke_lock(shard_id) + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to release shard lock", + })?; + + if revoked_by_this_call { + warn!("Shard lock is revoked already, shard_id:{}", shard_id); + } + + Ok(()) + } +} + +// TODO: maybe we should encapsulate the logic of handling meta event into a +// trait, so that we don't need to expose the logic to the meta event service +// implementation. + +async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> { + info!("Handle open shard begins, request:{request:?}"); + let shard_info = ShardInfo::from(&request.shard.context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "shard info is required", + })?); + + ctx.acquire_shard_lock(shard_info.id).await?; + + let tables_of_shard = ctx + .cluster + .open_shard(&shard_info) + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to open shards in cluster", + })?; + let catalog_name = &ctx.default_catalog; let shard_info = tables_of_shard.shard_info; let table_defs = tables_of_shard @@ -236,13 +296,20 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re .context(ErrWithCause { code: StatusCode::Internal, msg: "failed to open shard", - }) + })?; + + info!("Handle open shard success, shard_id:{}", shard_info.id); + + Ok(()) } async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> { + let shard_id = request.shard_id; + info!("Handle close shard begins, shard_id:{shard_id:?}"); + let tables_of_shard = ctx.cluster - .close_shard(&request) + .close_shard(shard_id) .await .box_err() .context(ErrWithCause { @@ -268,7 +335,7 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> engine: ANALYTIC_ENGINE_TYPE.to_string(), }; let opts = CloseOptions { - table_engine: ctx.table_engine, + table_engine: ctx.table_engine.clone(), }; ctx.table_operator @@ -286,8 +353,13 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> .await .with_context(|| ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to close wal region, shard_id:{}", request.shard_id), - }) + msg: format!("fail to close wal region, shard_id:{shard_id}"), + })?; + + ctx.release_shard_lock(shard_id).await?; + + info!("Handle close shard succeed, shard_id:{shard_id}"); + Ok(()) } async fn handle_create_table_on_shard( diff --git a/src/setup.rs b/src/setup.rs index b69076bb1d..64e3f5eceb 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -200,6 +200,10 @@ async fn build_with_meta( idc: config.node.idc.clone(), binary_version: config.node.binary_version.clone(), }; + + info!("Build ceresdb with node meta info:{node_meta_info:?}"); + + let endpoint = node_meta_info.endpoint(); let meta_client = meta_impl::build_meta_client(cluster_config.meta_client.clone(), node_meta_info) .await @@ -207,12 +211,14 @@ async fn build_with_meta( let shard_tables_cache = ShardTablesCache::default(); let cluster = { - let cluster_impl = ClusterImpl::new( + let cluster_impl = ClusterImpl::create( + endpoint, shard_tables_cache.clone(), meta_client.clone(), cluster_config.clone(), runtimes.meta_runtime.clone(), ) + .await .unwrap(); Arc::new(cluster_impl) }; From 0a28aafc4c0a6c0058da64e08bdf5cab6e8003c5 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 21 Apr 2023 16:59:27 +0800 Subject: [PATCH 3/4] feat: compaction support different level (#848) * feat: add level 1 sst file * add more comments for compaction tests * wait manual compaction finish * fix CR --- analytic_engine/src/compaction/mod.rs | 10 +++ analytic_engine/src/compaction/picker.rs | 7 +- .../src/instance/flush_compaction.rs | 24 ++++-- analytic_engine/src/instance/mod.rs | 39 +++++++--- analytic_engine/src/row_iter/merge.rs | 9 +-- analytic_engine/src/sst/factory.rs | 4 + analytic_engine/src/sst/file.rs | 51 +++++++++++-- analytic_engine/src/sst/manager.rs | 44 +++++------ .../src/sst/parquet/row_group_pruner.rs | 2 +- analytic_engine/src/sst/parquet/writer.rs | 73 +++++++++++-------- analytic_engine/src/table/version.rs | 44 ++++++----- analytic_engine/src/table/version_edit.rs | 27 +++---- analytic_engine/src/tests/compaction_test.rs | 5 +- benchmarks/src/merge_sst_bench.rs | 5 +- benchmarks/src/sst_tools.rs | 11 ++- common_util/src/lib.rs | 16 +++- tools/src/bin/sst-convert.rs | 11 ++- 17 files changed, 247 insertions(+), 135 deletions(-) diff --git a/analytic_engine/src/compaction/mod.rs b/analytic_engine/src/compaction/mod.rs index 8d2f495a63..79a9708b40 100644 --- a/analytic_engine/src/compaction/mod.rs +++ b/analytic_engine/src/compaction/mod.rs @@ -432,6 +432,16 @@ pub struct TableCompactionRequest { } impl TableCompactionRequest { + pub fn new(table_data: TableDataRef) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel::>(); + let req = Self { + table_data, + waiter: Some(tx), + }; + + (req, rx) + } + pub fn no_waiter(table_data: TableDataRef) -> Self { TableCompactionRequest { table_data, diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index d9eacba4a6..c2b5b96002 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -100,9 +100,7 @@ impl CommonCompactionPicker { levels_controller: &LevelsController, expire_time: Option, ) -> Option { - let num_levels = levels_controller.num_levels(); - //TODO(boyan) level compaction strategy - for level in 0..num_levels { + for level in levels_controller.levels() { if let Some(files) = self.level_picker.pick_candidates_at_level( ctx, levels_controller, @@ -112,8 +110,7 @@ impl CommonCompactionPicker { return Some(CompactionInputFiles { level, files, - // Now, we always output to the same level. - output_level: level, + output_level: level.next(), }); } } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 4437a9ff44..3bd9718660 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -46,7 +46,7 @@ use crate::{ }, sst::{ factory::{self, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions}, - file::FileMeta, + file::{FileMeta, Level}, meta_data::SstMetaReader, writer::{MetaData, RecordBatchStream}, }, @@ -387,7 +387,10 @@ impl FlushTask { let file = self.dump_normal_memtable(request_id, mem).await?; if let Some(file) = file { let sst_size = file.size; - files_to_level0.push(AddFile { level: 0, file }); + files_to_level0.push(AddFile { + level: Level::MIN, + file, + }); // Set flushed sequence to max of the last_sequence of memtables. flushed_sequence = cmp::max(flushed_sequence, mem.last_sequence()); @@ -516,7 +519,12 @@ impl FlushTask { let handler = self.runtime.spawn(async move { let mut writer = store .sst_factory - .create_writer(&sst_write_options, &sst_file_path, store.store_picker()) + .create_writer( + &sst_write_options, + &sst_file_path, + store.store_picker(), + Level::MIN, + ) .await .context(CreateSstWriter { storage_format_hint, @@ -572,7 +580,7 @@ impl FlushTask { for (idx, info_and_meta) in info_and_metas.into_iter().enumerate() { let (sst_info, sst_meta) = info_and_meta?; files_to_level0.push(AddFile { - level: 0, + level: Level::MIN, file: FileMeta { id: file_ids[idx], size: sst_info.file_size as u64, @@ -628,6 +636,7 @@ impl FlushTask { &sst_write_options, &sst_file_path, self.space_store.store_picker(), + Level::MIN, ) .await .context(CreateSstWriter { @@ -853,7 +862,12 @@ impl SpaceStore { let mut sst_writer = self .sst_factory - .create_writer(sst_write_options, &sst_file_path, self.store_picker()) + .create_writer( + sst_write_options, + &sst_file_path, + self.store_picker(), + input.output_level, + ) .await .context(CreateSstWriter { storage_format_hint: sst_write_options.storage_format_hint, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 5b5c7b3eb6..ca7c42c767 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -58,11 +58,19 @@ pub enum Error { source: crate::compaction::scheduler::Error, }, - #[snafu(display("Failed to flush table manually, table:{}, err:{}", table, source))] - ManualFlush { table: String, source: GenericError }, + #[snafu(display("Failed to {} table manually, table:{}, err:{}", op, table, source))] + ManualOp { + op: String, + table: String, + source: GenericError, + }, - #[snafu(display("Failed to receive flush result, table:{}, err:{}", table, source))] - RecvFlushResult { table: String, source: RecvError }, + #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] + RecvManualOpResult { + op: String, + table: String, + source: RecvError, + }, } define_result!(Error); @@ -194,25 +202,29 @@ impl Instance { .schedule_flush(flush_scheduler, table_data, flush_opts) .await .box_err() - .context(ManualFlush { + .context(ManualOp { + op: "flush", table: &table_data.name, })?; if let Some(rx) = rx_opt { rx.await - .context(RecvFlushResult { + .context(RecvManualOpResult { + op: "flush", table: &table_data.name, })? .box_err() - .context(ManualFlush { + .context(ManualOp { + op: "flush", table: &table_data.name, })?; } Ok(()) } + // This method will wait until compaction finished. pub async fn manual_compact_table(&self, table_data: &TableDataRef) -> Result<()> { - let request = TableCompactionRequest::no_waiter(table_data.clone()); + let (request, rx) = TableCompactionRequest::new(table_data.clone()); let succeed = self .compaction_scheduler .schedule_table_compaction(request) @@ -221,7 +233,16 @@ impl Instance { error!("Failed to schedule compaction, table:{}", table_data.name); } - Ok(()) + rx.await + .context(RecvManualOpResult { + op: "compact", + table: &table_data.name, + })? + .box_err() + .context(ManualOp { + op: "compact", + table: &table_data.name, + }) } } diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index b9dd964af6..da24fe51e1 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -34,8 +34,7 @@ use crate::{ space::SpaceId, sst::{ factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReadOptions}, - file::FileHandle, - manager::MAX_LEVEL, + file::{FileHandle, Level, SST_LEVEL_NUM}, }, table::version::{MemTableVec, SamplingMemTable}, }; @@ -127,7 +126,7 @@ impl<'a> MergeBuilder<'a> { config, sampling_mem: None, memtables: Vec::new(), - ssts: vec![Vec::new(); MAX_LEVEL], + ssts: vec![Vec::new(); SST_LEVEL_NUM], } } @@ -151,8 +150,8 @@ impl<'a> MergeBuilder<'a> { } /// Returns file handles in `level`, panic if level >= MAX_LEVEL - pub fn mut_ssts_of_level(&mut self, level: u16) -> &mut Vec { - &mut self.ssts[usize::from(level)] + pub fn mut_ssts_of_level(&mut self, level: Level) -> &mut Vec { + &mut self.ssts[level.as_usize()] } pub async fn build(self) -> Result { diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 891876331d..4100073133 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -14,6 +14,7 @@ use trace_metric::MetricsCollector; use crate::{ sst::{ + file::Level, header, header::HeaderParser, meta_data::cache::MetaCacheRef, @@ -74,6 +75,7 @@ pub trait Factory: Send + Sync + Debug { options: &SstWriteOptions, path: &'a Path, store_picker: &'a ObjectStorePickerRef, + level: Level, ) -> Result>; } @@ -178,6 +180,7 @@ impl Factory for FactoryImpl { options: &SstWriteOptions, path: &'a Path, store_picker: &'a ObjectStorePickerRef, + level: Level, ) -> Result> { let hybrid_encoding = match options.storage_format_hint { StorageFormatHint::Specific(format) => matches!(format, StorageFormat::Hybrid), @@ -187,6 +190,7 @@ impl Factory for FactoryImpl { Ok(Box::new(ParquetSstWriter::new( path, + level, hybrid_encoding, store_picker, options, diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 944ea19950..57cabb98f0 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -43,7 +43,48 @@ pub enum Error { define_result!(Error); -pub type Level = u16; +pub const SST_LEVEL_NUM: usize = 2; + +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub struct Level(u16); + +impl Level { + // Currently there are only two levels: 0, 1. + pub const MAX: Self = Self(1); + pub const MIN: Self = Self(0); + + pub fn next(&self) -> Self { + Self::MAX.0.min(self.0 + 1).into() + } + + pub fn is_min(&self) -> bool { + self == &Self::MIN + } + + pub fn as_usize(&self) -> usize { + self.0 as usize + } + + pub fn as_u32(&self) -> u32 { + self.0 as u32 + } + + pub fn as_u16(&self) -> u16 { + self.0 + } +} + +impl From for Level { + fn from(value: u16) -> Self { + Self(value) + } +} + +impl fmt::Display for Level { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} // TODO(yingwen): Order or split file by time range to speed up filter (even in // level 0). @@ -55,7 +96,7 @@ pub struct LevelHandler { } impl LevelHandler { - pub fn new(level: u16) -> Self { + pub fn new(level: Level) -> Self { Self { level, files: FileHandleSet::default(), @@ -72,11 +113,7 @@ impl LevelHandler { } pub fn pick_ssts(&self, time_range: TimeRange) -> Vec { - if self.level == 0 { - self.files.files_by_time_range(time_range) - } else { - Vec::new() - } + self.files.files_by_time_range(time_range) } #[inline] diff --git a/analytic_engine/src/sst/manager.rs b/analytic_engine/src/sst/manager.rs index 335b6d361d..52bb71d9fa 100644 --- a/analytic_engine/src/sst/manager.rs +++ b/analytic_engine/src/sst/manager.rs @@ -11,8 +11,6 @@ use crate::{ /// Id for a sst file pub type FileId = u64; -/// We use two level merge tree, the max level should less than u16::MAX -pub const MAX_LEVEL: usize = 2; /// A table level manager that manages all the sst files of the table pub struct LevelsController { @@ -30,13 +28,10 @@ impl Drop for LevelsController { impl LevelsController { /// Create an empty LevelsController pub fn new(purge_queue: FilePurgeQueue) -> Self { - let mut levels = Vec::with_capacity(MAX_LEVEL); - for level in 0..MAX_LEVEL { - levels.push(LevelHandler::new(level as Level)); - } - Self { - levels, + levels: (Level::MIN.as_u16()..=Level::MAX.as_u16()) + .map(|v| LevelHandler::new(v.into())) + .collect::>(), purge_queue, } } @@ -45,14 +40,14 @@ impl LevelsController { /// /// Panic: If the level is greater than the max level pub fn add_sst_to_level(&mut self, level: Level, file_meta: FileMeta) { - let level_handler = &mut self.levels[usize::from(level)]; + let level_handler = &mut self.levels[level.as_usize()]; let file = FileHandle::new(file_meta, self.purge_queue.clone()); level_handler.insert(file); } pub fn latest_sst(&self, level: Level) -> Option { - self.levels[usize::from(level)].latest_sst() + self.levels[level.as_usize()].latest_sst() } /// Pick the ssts and collect it by `append_sst`. @@ -71,20 +66,19 @@ impl LevelsController { /// /// Panic: If the level is greater than the max level pub fn remove_ssts_from_level(&mut self, level: Level, file_ids: &[FileId]) { - let level_handler = &mut self.levels[usize::from(level)]; + let level_handler = &mut self.levels[level.as_usize()]; level_handler.remove_ssts(file_ids); } - /// Total number of levels. - pub fn num_levels(&self) -> Level { - self.levels.len() as Level + pub fn levels(&self) -> impl Iterator + '_ { + self.levels.iter().map(|v| v.level) } /// Iter ssts at given `level`. /// /// Panic if level is out of bound. pub fn iter_ssts_at_level(&self, level: Level) -> Iter { - let level_handler = &self.levels[usize::from(level)]; + let level_handler = &self.levels[level.as_usize()]; level_handler.iter_ssts() } @@ -93,7 +87,7 @@ impl LevelsController { level: Level, expire_time: Option, ) -> Vec { - let level_handler = &self.levels[usize::from(level)]; + let level_handler = &self.levels[level.as_usize()]; let mut expired = Vec::new(); level_handler.collect_expired(expire_time, &mut expired); @@ -107,14 +101,12 @@ impl LevelsController { } pub fn expired_ssts(&self, expire_time: Option) -> Vec { - let mut expired = Vec::new(); - let num_levels = self.num_levels(); - for level in 0..num_levels { - let files = self.collect_expired_at_level(level, expire_time); - expired.push(ExpiredFiles { level, files }); - } - - expired + self.levels() + .map(|level| { + let files = self.collect_expired_at_level(level, expire_time); + ExpiredFiles { level, files } + }) + .collect() } } @@ -125,7 +117,7 @@ pub mod tests { use crate::{ sst::{ - file::{FileMeta, FilePurgeQueue}, + file::{FileMeta, FilePurgeQueue, Level}, manager::{FileId, LevelsController}, meta_data::SstMetaData, }, @@ -150,7 +142,7 @@ pub mod tests { let mut levels_controller = LevelsController::new(file_purge_queue); for (id, sst_meta) in self.sst_meta_vec.into_iter().enumerate() { levels_controller.add_sst_to_level( - 0, + Level::MIN, FileMeta { id: id as FileId, size: 0, diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 11626c5556..51c270c88b 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -60,7 +60,7 @@ impl<'a> RowGroupPruner<'a> { ) -> Result { if let Some(f) = parquet_filter { ensure!(f.len() == row_groups.len(), OtherNoCause { - msg: format!("expect the same number of ss_filter as the number of row groups, num_parquet_filters:{}, num_row_groups:{}", f.len(), row_groups.len()), + msg: format!("expect sst_filters.len() == row_groups.len(), num_sst_filters:{}, num_row_groups:{}", f.len(), row_groups.len()), }); } diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 898542daaf..b354f89e1b 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -12,9 +12,11 @@ use object_store::{ObjectStoreRef, Path}; use snafu::ResultExt; use tokio::io::AsyncWrite; +use super::meta_data::RowGroupFilter; use crate::{ sst::{ factory::{ObjectStorePickerRef, SstWriteOptions}, + file::Level, parquet::{ encoding::ParquetEncoder, meta_data::{ParquetFilter, ParquetMetaData, RowGroupFilterBuilder}, @@ -32,6 +34,7 @@ use crate::{ pub struct ParquetSstWriter<'a> { /// The path where the data is persisted. path: &'a Path, + level: Level, hybrid_encoding: bool, /// The storage where the data is persist. store: &'a ObjectStoreRef, @@ -44,6 +47,7 @@ pub struct ParquetSstWriter<'a> { impl<'a> ParquetSstWriter<'a> { pub fn new( path: &'a Path, + level: Level, hybrid_encoding: bool, store_picker: &'a ObjectStorePickerRef, options: &SstWriteOptions, @@ -51,6 +55,7 @@ impl<'a> ParquetSstWriter<'a> { let store = store_picker.default_store(); Self { path, + level, hybrid_encoding, store, num_rows_per_row_group: options.num_rows_per_row_group, @@ -71,9 +76,7 @@ struct RecordBatchGroupWriter { num_rows_per_row_group: usize, max_buffer_size: usize, compression: Compression, - /// The filter for the parquet file, and it will be updated during - /// generating the parquet file. - parquet_filter: ParquetFilter, + level: Level, } impl RecordBatchGroupWriter { @@ -139,33 +142,29 @@ impl RecordBatchGroupWriter { Ok(curr_row_group) } - /// Build the parquet filter for the given `row_group`, and then update it - /// into `self.parquet_filter`. - fn update_parquet_filter(&mut self, row_group_batch: &[RecordBatchWithKey]) -> Result<()> { - // TODO: support filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) - if self.hybrid_encoding { - return Ok(()); - } - - let row_group_filter = { - let mut builder = - RowGroupFilterBuilder::with_num_columns(row_group_batch[0].num_columns()); - - for partial_batch in row_group_batch { - for (col_idx, column) in partial_batch.columns().iter().enumerate() { - for row in 0..column.num_rows() { - let datum = column.datum(row); - let bytes = datum.to_bytes(); - builder.add_key(col_idx, &bytes); - } + /// Build the parquet filter for the given `row_group`. + fn build_row_group_filter( + &self, + row_group_batch: &[RecordBatchWithKey], + ) -> Result { + let mut builder = RowGroupFilterBuilder::with_num_columns(row_group_batch[0].num_columns()); + + for partial_batch in row_group_batch { + for (col_idx, column) in partial_batch.columns().iter().enumerate() { + for row in 0..column.num_rows() { + let datum = column.datum(row); + let bytes = datum.to_bytes(); + builder.add_key(col_idx, &bytes); } } + } - builder.build().box_err().context(BuildParquetFilter)? - }; + builder.build().box_err().context(BuildParquetFilter) + } - self.parquet_filter.push_row_group_filter(row_group_filter); - Ok(()) + fn need_custom_filter(&self) -> bool { + // TODO: support filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) + !self.hybrid_encoding && !self.level.is_min() } async fn write_all(mut self, sink: W) -> Result { @@ -183,6 +182,11 @@ impl RecordBatchGroupWriter { ) .box_err() .context(EncodeRecordBatch)?; + let mut parquet_filter = if self.need_custom_filter() { + None + } else { + Some(ParquetFilter::default()) + }; loop { let row_group = self.fetch_next_row_group(&mut prev_record_batch).await?; @@ -190,7 +194,9 @@ impl RecordBatchGroupWriter { break; } - self.update_parquet_filter(&row_group)?; + if let Some(filter) = &mut parquet_filter { + filter.push_row_group_filter(self.build_row_group_filter(&row_group)?); + } let num_batches = row_group.len(); for record_batch in row_group { @@ -210,7 +216,7 @@ impl RecordBatchGroupWriter { let parquet_meta_data = { let mut parquet_meta_data = ParquetMetaData::from(self.meta_data); - parquet_meta_data.parquet_filter = Some(self.parquet_filter); + parquet_meta_data.parquet_filter = parquet_filter; parquet_meta_data }; parquet_encoder @@ -284,7 +290,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { max_buffer_size: self.max_buffer_size, compression: self.compression, meta_data: meta.clone(), - parquet_filter: ParquetFilter::default(), + level: self.level, }; let (aborter, sink) = @@ -412,7 +418,12 @@ mod tests { })); let mut writer = sst_factory - .create_writer(&sst_write_options, &sst_file_path, &store_picker) + .create_writer( + &sst_write_options, + &sst_file_path, + &store_picker, + Level::MAX, + ) .await .unwrap(); let sst_info = writer @@ -541,7 +552,7 @@ mod tests { schema, }, max_buffer_size: 0, - parquet_filter: ParquetFilter::default(), + level: Level::default(), }; let mut prev_record_batch = None; diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index bfccae2480..329c09677a 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -28,8 +28,8 @@ use crate::{ memtable::{self, key::KeySequence, MemTableRef, PutContext}, sampler::{DefaultSampler, SamplerRef}, sst::{ - file::{FileHandle, FilePurgeQueue}, - manager::{FileId, LevelsController, MAX_LEVEL}, + file::{FileHandle, FilePurgeQueue, SST_LEVEL_NUM}, + manager::{FileId, LevelsController}, }, table::{ data::MemTableId, @@ -487,7 +487,7 @@ impl Default for ReadView { Self { sampling_mem: None, memtables: Vec::new(), - leveled_ssts: vec![Vec::new(); MAX_LEVEL], + leveled_ssts: vec![Vec::new(); SST_LEVEL_NUM], } } } @@ -503,7 +503,7 @@ struct TableVersionInner { /// All memtables memtable_view: MemTableView, /// All ssts - levels: LevelsController, + levels_controller: LevelsController, /// The earliest sequence number of the entries already flushed (inclusive). /// All log entry with sequence <= `flushed_sequence` can be deleted @@ -544,7 +544,7 @@ impl TableVersion { Self { inner: RwLock::new(TableVersionInner { memtable_view: MemTableView::new(), - levels: LevelsController::new(purge_queue), + levels_controller: LevelsController::new(purge_queue), flushed_sequence: 0, }), } @@ -662,13 +662,15 @@ impl TableVersion { // Add sst files to level first. for add_file in edit.files_to_add { - inner.levels.add_sst_to_level(add_file.level, add_file.file); + inner + .levels_controller + .add_sst_to_level(add_file.level, add_file.file); } // Remove ssts from level. for delete_file in edit.files_to_delete { inner - .levels + .levels_controller .remove_ssts_from_level(delete_file.level, &[delete_file.file_id]); } @@ -685,14 +687,16 @@ impl TableVersion { inner.flushed_sequence = cmp::max(inner.flushed_sequence, meta.flushed_sequence); for add_file in meta.files.into_values() { - inner.levels.add_sst_to_level(add_file.level, add_file.file); + inner + .levels_controller + .add_sst_to_level(add_file.level, add_file.file); } } pub fn pick_read_view(&self, time_range: TimeRange) -> ReadView { let mut sampling_mem = None; let mut memtables = MemTableVec::new(); - let mut leveled_ssts = vec![Vec::new(); MAX_LEVEL]; + let mut leveled_ssts = vec![Vec::new(); SST_LEVEL_NUM]; { // Pick memtables for read. @@ -703,9 +707,11 @@ impl TableVersion { .memtables_for_read(time_range, &mut memtables, &mut sampling_mem); // Pick ssts for read. - inner.levels.pick_ssts(time_range, |level, ssts| { - leveled_ssts[level as usize].extend_from_slice(ssts) - }); + inner + .levels_controller + .pick_ssts(time_range, |level, ssts| { + leveled_ssts[level.as_usize()].extend_from_slice(ssts) + }); } ReadView { @@ -723,19 +729,19 @@ impl TableVersion { ) -> picker::Result { let inner = self.inner.read().unwrap(); - picker.pick_compaction(picker_ctx, &inner.levels) + picker.pick_compaction(picker_ctx, &inner.levels_controller) } pub fn has_expired_sst(&self, expire_time: Option) -> bool { let inner = self.inner.read().unwrap(); - inner.levels.has_expired_sst(expire_time) + inner.levels_controller.has_expired_sst(expire_time) } pub fn expired_ssts(&self, expire_time: Option) -> Vec { let inner = self.inner.read().unwrap(); - inner.levels.expired_ssts(expire_time) + inner.levels_controller.expired_ssts(expire_time) } pub fn flushed_sequence(&self) -> SequenceNumber { @@ -746,11 +752,11 @@ impl TableVersion { pub fn snapshot(&self) -> TableVersionSnapshot { let inner = self.inner.read().unwrap(); - let levels = &inner.levels; - let num_levels = levels.num_levels(); - let files = (0..num_levels) + let controller = &inner.levels_controller; + let files = controller + .levels() .flat_map(|level| { - let ssts = levels.iter_ssts_at_level(level); + let ssts = controller.iter_ssts_at_level(level); ssts.map(move |file| { let add_file = AddFile { level, diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index d593055851..11cf1c4e34 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -2,7 +2,7 @@ //! Version edits -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use ceresdbproto::manifest as manifest_pb; use common_types::{time::TimeRange, SequenceNumber}; @@ -10,7 +10,10 @@ use common_util::define_result; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ - sst::{file::FileMeta, manager::FileId}, + sst::{ + file::{FileMeta, Level}, + manager::FileId, + }, table::data::MemTableId, table_options::StorageFormat, }; @@ -43,7 +46,7 @@ define_result!(Error); #[derive(Debug, Clone, PartialEq, Eq)] pub struct AddFile { /// The level of the file intended to add. - pub level: u16, + pub level: Level, /// Meta data of the file to add. pub file: FileMeta, } @@ -52,7 +55,7 @@ impl From for manifest_pb::AddFileMeta { /// Convert into protobuf struct fn from(v: AddFile) -> manifest_pb::AddFileMeta { manifest_pb::AddFileMeta { - level: v.level as u32, + level: v.level.as_u32(), file_id: v.file.id, time_range: Some(v.file.time_range.into()), max_seq: v.file.max_seq, @@ -74,10 +77,7 @@ impl TryFrom for AddFile { }; let target = Self { - level: src - .level - .try_into() - .context(InvalidLevel { level: src.level })?, + level: (src.level as u16).into(), file: FileMeta { id: src.file_id, size: src.size, @@ -96,7 +96,7 @@ impl TryFrom for AddFile { #[derive(Debug, Clone, PartialEq, Eq)] pub struct DeleteFile { /// The level of the file intended to delete. - pub level: u16, + pub level: Level, /// Id of the file to delete. pub file_id: FileId, } @@ -104,7 +104,7 @@ pub struct DeleteFile { impl From for manifest_pb::DeleteFileMeta { fn from(v: DeleteFile) -> Self { manifest_pb::DeleteFileMeta { - level: v.level as u32, + level: v.level.as_u32(), file_id: v.file_id, } } @@ -114,10 +114,7 @@ impl TryFrom for DeleteFile { type Error = Error; fn try_from(src: manifest_pb::DeleteFileMeta) -> Result { - let level = src - .level - .try_into() - .context(InvalidLevel { level: src.level })?; + let level = (src.level as u16).into(); Ok(Self { level, @@ -173,7 +170,7 @@ pub mod tests { pub fn build(&self) -> AddFile { AddFile { - level: 0, + level: Level::MIN, file: FileMeta { id: self.file_id, size: 0, diff --git a/analytic_engine/src/tests/compaction_test.rs b/analytic_engine/src/tests/compaction_test.rs index c9025a1254..3f88125f64 100644 --- a/analytic_engine/src/tests/compaction_test.rs +++ b/analytic_engine/src/tests/compaction_test.rs @@ -38,7 +38,8 @@ fn test_table_compact_current_segment(engine_context: T) let mut expect_rows = Vec::new(); let start_ms = test_ctx.start_ms(); - // Write more than ensure compaction will be triggered. + // Write max_threshold*2 sst to ensure level0->level1, level1->level1 compaction + // will be triggered. for offset in 0..default_opts.max_threshold as i64 * 2 { let rows = [ ( @@ -87,6 +88,8 @@ fn test_table_compact_current_segment(engine_context: T) ) .await; + common_util::tests::init_log_for_test(); + // Trigger a compaction. test_ctx.compact_table(test_table1).await; diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 334288c911..be7ff79353 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -18,7 +18,7 @@ use analytic_engine::{ FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadOptions, }, - file::{FileHandle, FilePurgeQueue, Request}, + file::{FileHandle, FilePurgeQueue, Level, Request}, meta_data::cache::MetaCacheRef, }, table::sst_util, @@ -146,7 +146,8 @@ impl MergeSstBench { }); builder - .mut_ssts_of_level(0) + // TODO: make level configurable + .mut_ssts_of_level(Level::MIN) .extend_from_slice(&self.file_handles); self.runtime.block_on(async { diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index f02c9c79a6..8f3a6c1f09 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -17,7 +17,7 @@ use analytic_engine::{ Factory, FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, SstReadOptions, SstWriteOptions, }, - file::FilePurgeQueue, + file::{FilePurgeQueue, Level}, manager::FileId, meta_data::SstMetaReader, writer::{MetaData, RecordBatchStream}, @@ -65,7 +65,12 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa let sst_file_path = Path::from(config.sst_file_name); let mut writer = sst_factory - .create_writer(&sst_write_options, &sst_file_path, &store_picker) + .create_writer( + &sst_write_options, + &sst_file_path, + &store_picker, + Level::MAX, + ) .await .unwrap(); writer @@ -240,7 +245,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { reverse: false, }); builder - .mut_ssts_of_level(0) + .mut_ssts_of_level(Level::MIN) .extend_from_slice(&file_handles); builder.build().await.unwrap() diff --git a/common_util/src/lib.rs b/common_util/src/lib.rs index 71ea431b7c..b56c2d1223 100644 --- a/common_util/src/lib.rs +++ b/common_util/src/lib.rs @@ -23,13 +23,25 @@ pub mod toml; #[cfg(any(test, feature = "test"))] pub mod tests { - use std::sync::Once; + use std::{io::Write, sync::Once}; static INIT_LOG: Once = Once::new(); pub fn init_log_for_test() { INIT_LOG.call_once(|| { - env_logger::init(); + env_logger::Builder::from_default_env() + .format(|buf, record| { + writeln!( + buf, + "{} {} [{}:{}] {}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S.%3f"), + buf.default_styled_level(record.level()), + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.args() + ) + }) + .init(); }); } } diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 1ef8b0ba2b..37de204e2b 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -5,9 +5,12 @@ use std::{error::Error, sync::Arc}; use analytic_engine::{ - sst::factory::{ - Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, - SstReadOptions, SstWriteOptions, + sst::{ + factory::{ + Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, + SstReadOptions, SstWriteOptions, + }, + file::Level, }, table_options::{Compression, StorageFormatHint}, }; @@ -108,7 +111,7 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { }; let output = Path::from(args.output); let mut writer = factory - .create_writer(&builder_opts, &output, &store_picker) + .create_writer(&builder_opts, &output, &store_picker, Level::MAX) .await .expect("no sst writer found"); let sst_stream = reader From cdedef9fd66fa9200615b2a38c24150078127c64 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Mon, 24 Apr 2023 13:16:16 +0800 Subject: [PATCH 4/4] fix: avoid panic when convert prom result (#851) --- server/src/proxy/http/prom.rs | 41 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/server/src/proxy/http/prom.rs b/server/src/proxy/http/prom.rs index 05e25b3de1..fd1c1c9314 100644 --- a/server/src/proxy/http/prom.rs +++ b/server/src/proxy/http/prom.rs @@ -16,6 +16,7 @@ use common_types::{ schema::{RecordSchema, TIMESTAMP_COLUMN, TSID_COLUMN}, }; use common_util::error::BoxError; +use http::StatusCode; use interpreters::interpreter::Output; use log::debug; use prom_remote_api::types::{ @@ -29,7 +30,7 @@ use warp::reject; use crate::{ context::RequestContext, proxy::{ - error::{Error, Internal, InternalNoCause, Result}, + error::{ErrNoCause, Error, Internal, InternalNoCause, Result}, grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, http::query::{QueryRequest, Request}, Proxy, @@ -231,10 +232,24 @@ impl Converter { .map(|(_, idx)| batch.column(*idx)) .collect::>(); for row_idx in 0..batch.num_rows() { - let tsid = tsid_col - .datum(row_idx) - .as_u64() - .expect("checked in try_new"); + let tsid = tsid_col.datum(row_idx).as_u64().context(ErrNoCause { + msg: "value should be non-nullable i64", + code: StatusCode::BAD_REQUEST, + })?; + let sample = Sample { + timestamp: timestamp_col + .datum(row_idx) + .as_timestamp() + .context(ErrNoCause { + msg: "timestamp should be non-nullable timestamp", + code: StatusCode::BAD_REQUEST, + })? + .as_i64(), + value: value_col.datum(row_idx).as_f64().context(ErrNoCause { + msg: "value should be non-nullable f64", + code: StatusCode::BAD_REQUEST, + })?, + }; series_by_tsid .entry(tsid) .or_insert_with(|| { @@ -244,7 +259,9 @@ impl Converter { .enumerate() .map(|(idx, (col_name, _))| { let col_value = tag_cols[idx].datum(row_idx); - let col_value = col_value.as_str().expect("checked in try_new"); + // for null tag value, use empty string instead + let col_value = col_value.as_str().unwrap_or_default(); + Label { name: col_name.to_string(), value: col_value.to_string(), @@ -262,17 +279,7 @@ impl Converter { } }) .samples - .push(Sample { - timestamp: timestamp_col - .datum(row_idx) - .as_timestamp() - .expect("checked in try_new") - .as_i64(), - value: value_col - .datum(row_idx) - .as_f64() - .expect("checked in try_new"), - }); + .push(sample); } }