Skip to content

Commit

Permalink
feat: support for disabling router cache
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 18, 2023
1 parent 45c2e89 commit cd03e7c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ DROP TABLE IF EXISTS `partition_table_t`;

affected_rows: 0

SHOW CREATE TABLE partition_table_t;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SHOW CREATE TABLE partition_table_t;. Caused by: Failed to create plan, err:Table not found, table:partition_table_t" })

3 changes: 1 addition & 2 deletions integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,4 @@ SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7

DROP TABLE IF EXISTS `partition_table_t`;

-- The route cache will cause the data table to be queried after it is deleted. Refer to #893.
-- SHOW CREATE TABLE partition_table_t;
SHOW CREATE TABLE partition_table_t;
39 changes: 26 additions & 13 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ struct RouteData {

pub struct ClusterBasedRouter {
cluster: ClusterRef,
cache: Cache<String, RouteData>,
cache: Option<Cache<String, RouteData>>,
}

impl ClusterBasedRouter {
pub fn new(cluster: ClusterRef, cache_config: RouteCacheConfig) -> Self {
let cache = Cache::builder()
.time_to_live(cache_config.ttl.0)
.time_to_idle(cache_config.tti.0)
.max_capacity(cache_config.capacity)
.build();
let cache = if cache_config.enable {
Some(
Cache::builder()
.time_to_live(cache_config.ttl.0)
.time_to_idle(cache_config.tti.0)
.max_capacity(cache_config.capacity)
.build(),
)
} else {
None
};

Self { cluster, cache }
}
Expand All @@ -42,12 +48,16 @@ impl ClusterBasedRouter {
fn route_from_cache(&self, tables: &[String], routes: &mut Vec<RouteData>) -> Vec<String> {
let mut miss = vec![];

for table in tables {
if let Some(route) = self.cache.get(table) {
routes.push(route.clone());
} else {
miss.push(table.clone());
if let Some(cache) = &self.cache {
for table in tables {
if let Some(route) = cache.get(table) {
routes.push(route.clone());
} else {
miss.push(table.clone());
}
}
} else {
miss = tables.to_vec();
}

miss
Expand Down Expand Up @@ -97,8 +107,10 @@ impl ClusterBasedRouter {
};

if let Some(route) = route {
// There may be data race here, and it is acceptable currently.
self.cache.insert(table_name.clone(), route.clone()).await;
if let Some(cache) = &self.cache {
// There may be data race here, and it is acceptable currently.
cache.insert(table_name.clone(), route.clone()).await;
}
routes.push(route);
}
}
Expand Down Expand Up @@ -257,6 +269,7 @@ mod tests {
let mock_cluster = MockClusterImpl {};

let config = RouteCacheConfig {
enable: true,
ttl: ReadableDuration::from(Duration::from_secs(4)),
tti: ReadableDuration::from(Duration::from_secs(2)),
capacity: 2,
Expand Down
5 changes: 4 additions & 1 deletion router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

pub mod cluster_based;
pub mod endpoint;
pub(crate) mod hash;
mod hash;
pub mod rule_based;
use std::{sync::Arc, time::Duration};

Expand Down Expand Up @@ -68,6 +68,8 @@ pub trait Router {

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RouteCacheConfig {
/// Enable route cache, default false.
enable: bool,
/// Time to live (TTL) in second.
ttl: ReadableDuration,
/// Time to idle (TTI) in second.
Expand All @@ -79,6 +81,7 @@ pub struct RouteCacheConfig {
impl Default for RouteCacheConfig {
fn default() -> Self {
Self {
enable: false,
ttl: ReadableDuration::from(Duration::from_secs(5)),
tti: ReadableDuration::from(Duration::from_secs(5)),
capacity: 10_000,
Expand Down

0 comments on commit cd03e7c

Please sign in to comment.