Skip to content

Commit

Permalink
feat: support re-acquire shard lock in a fast way (#1251)
Browse files Browse the repository at this point in the history
## Rationale
Currently, a restarted ceresdb-server must wait for the previous lock
lease to expire before acquiring the new shard locks, leading to long
failover time.

## Detailed Changes
Support fast reacquire the shard lock by reusing the previous lock
lease.

## Test Plan
Manually test this feature.
  • Loading branch information
ShiKaiWi authored Oct 10, 2023
1 parent 99ae6e0 commit 4c66a3f
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 49 deletions.
19 changes: 10 additions & 9 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tokio::{

use crate::{
config::ClusterConfig,
shard_lock_manager::{ShardLockManager, ShardLockManagerRef},
shard_lock_manager::{self, ShardLockManager, ShardLockManagerRef},
shard_set::{Shard, ShardRef, ShardSet},
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, InvalidArguments,
Expand Down Expand Up @@ -85,15 +85,16 @@ impl ClusterImpl {
&config.etcd_client.root_path,
&config.meta_client.cluster_name,
)?;
let shard_lock_manager = ShardLockManager::new(
shard_lock_key_prefix,
let shard_lock_mgr_config = shard_lock_manager::Config {
node_name,
etcd_client,
config.etcd_client.shard_lock_lease_ttl_sec,
config.etcd_client.shard_lock_lease_check_interval.0,
config.etcd_client.rpc_timeout(),
runtime.clone(),
);
lock_key_prefix: shard_lock_key_prefix,
lock_lease_ttl_sec: config.etcd_client.shard_lock_lease_ttl_sec,
lock_lease_check_interval: config.etcd_client.shard_lock_lease_check_interval.0,
enable_fast_reacquire_lock: config.etcd_client.enable_shard_lock_fast_reacquire,
rpc_timeout: config.etcd_client.rpc_timeout(),
runtime: runtime.clone(),
};
let shard_lock_manager = ShardLockManager::new(shard_lock_mgr_config, etcd_client);
Ok(Self {
inner,
runtime,
Expand Down
3 changes: 3 additions & 0 deletions cluster/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct EtcdClientConfig {
pub shard_lock_lease_ttl_sec: u64,
/// The interval of checking whether the shard lock lease is expired
pub shard_lock_lease_check_interval: ReadableDuration,
/// The shard lock can be reacquired in a fast way if set.
pub enable_shard_lock_fast_reacquire: bool,
}

impl EtcdClientConfig {
Expand Down Expand Up @@ -102,6 +104,7 @@ impl Default for EtcdClientConfig {
connect_timeout: ReadableDuration::secs(5),
shard_lock_lease_ttl_sec: 30,
shard_lock_lease_check_interval: ReadableDuration::millis(200),
enable_shard_lock_fast_reacquire: false,
}
}
}
Expand Down
215 changes: 177 additions & 38 deletions cluster/src/shard_lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to get lease, shard_id:{shard_id}, lease_id:{lease_id}, err:{source}.\nBacktrace:\n{backtrace:?}"
))]
GetLease {
shard_id: ShardId,
lease_id: i64,
source: etcd_client::Error,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to get lock key, shard_id:{shard_id}, err:{source}.\nBacktrace:\n{backtrace:?}"
))]
GetLockKey {
shard_id: ShardId,
source: etcd_client::Error,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to grant lease, shard_id:{shard_id}, err:{source}.\nBacktrace:\n{backtrace:?}"
))]
Expand Down Expand Up @@ -117,10 +136,11 @@ pub type ShardLockManagerRef = Arc<ShardLockManager>;
///
/// Only with the lock held, the shard can be operated by this node.
pub struct ShardLockManager {
key_prefix: String,
value: Bytes,
lease_ttl_sec: u64,
lock_key_prefix: String,
lock_value: Bytes,
lock_lease_ttl_sec: u64,
lock_lease_check_interval: Duration,
enable_fast_reacquire_lock: bool,
rpc_timeout: Duration,

etcd_client: Client,
Expand Down Expand Up @@ -343,6 +363,8 @@ pub struct ShardLock {
ttl_sec: u64,
/// The interval to check whether the lease is expired
lease_check_interval: Duration,
/// The previous lease will be re-used if set
enable_fast_reacquire: bool,
/// The timeout for etcd rpc
rpc_timeout: Duration,

Expand All @@ -351,13 +373,21 @@ pub struct ShardLock {
lease_keepalive_stopper: Option<oneshot::Sender<()>>,
}

/// The information about an etcd lease.
#[derive(Debug, Clone)]
struct LeaseInfo {
id: i64,
expired_at: Instant,
}

impl ShardLock {
fn new(
shard_id: ShardId,
key_prefix: &str,
value: Bytes,
ttl_sec: u64,
lease_check_interval: Duration,
enable_fast_reacquire: bool,
rpc_timeout: Duration,
) -> Self {
Self {
Expand All @@ -366,6 +396,7 @@ impl ShardLock {
value,
ttl_sec,
lease_check_interval,
enable_fast_reacquire,
rpc_timeout,

lease: None,
Expand All @@ -380,6 +411,97 @@ impl ShardLock {
Bytes::from(key)
}

/// The shard lock is allowed to acquired in a fast way if the lock has been
/// created by this node just before.
///
/// The slow way has to wait for the previous lock lease expired. And the
/// fast way is to reuse the lease if it is not expired and the lock's value
/// is the same.
async fn maybe_fast_acquire_lock(&self, etcd_client: &mut Client) -> Result<Option<LeaseInfo>> {
if !self.enable_fast_reacquire {
return Ok(None);
}

let resp = etcd_client
.get(self.key.clone(), None)
.await
.context(GetLockKey {
shard_id: self.shard_id,
})?;

// Only one or zero key-value will be fetched, and it can continue only if one
// key-value is returned.
if resp.kvs().len() != 1 {
warn!(
"Expect exactly one key value pair, but found {} kv pairs, shard_id:{}",
resp.kvs().len(),
self.shard_id
);
return Ok(None);
}
let kv = &resp.kvs()[0];
let lease_id = kv.lease();
if lease_id == 0 {
// There is no lease attached to the lock key.
return Ok(None);
}

// FIXME: A better way is to compare the specific field of the decoded values.
if kv.value() != self.value {
warn!(
"Try to acquire a lock held by others, shard_id:{}",
self.shard_id
);
return Ok(None);
}

let ttl_sec = etcd_client
.lease_time_to_live(lease_id, None)
.await
.context(GetLease {
shard_id: self.shard_id,
lease_id,
})?
.ttl();

if ttl_sec == 0 {
// The lease has expired.
return Ok(None);
}

let lease_expired_at = Instant::now() + Duration::from_secs(ttl_sec as u64);
Ok(Some(LeaseInfo {
id: lease_id,
expired_at: lease_expired_at,
}))
}

async fn slow_acquire_lock(&self, etcd_client: &mut Client) -> Result<LeaseInfo> {
// Grant the lease first.
let resp = etcd_client
.lease_grant(self.ttl_sec as i64, None)
.await
.context(GrantLease {
shard_id: self.shard_id,
})?;
ensure!(
resp.ttl() > 0,
GrantLeaseWithInvalidTTL {
shard_id: self.shard_id,
ttl_sec: resp.ttl()
}
);

let lease_expired_at = Instant::now() + Duration::from_secs(resp.ttl() as u64);
let lease_id = resp.id();
self.create_lock_with_lease(lease_id, etcd_client).await?;

Ok(LeaseInfo {
id: lease_id,
expired_at: lease_expired_at,
})
}

/// Grant the shard lock.
///
/// The `on_lock_expired` callback will be called when the lock is expired,
Expand All @@ -400,28 +522,30 @@ impl ShardLock {
return Ok(false);
}

// Grant the lease first.
let resp = etcd_client
.lease_grant(self.ttl_sec as i64, None)
.await
.context(GrantLease {
shard_id: self.shard_id,
})?;
ensure!(
resp.ttl() > 0,
GrantLeaseWithInvalidTTL {
shard_id: self.shard_id,
ttl_sec: resp.ttl()
let lease_info = match self.maybe_fast_acquire_lock(etcd_client).await {
Ok(Some(v)) => {
info!("Shard lock is acquired fast, shard_id:{}", self.shard_id);
v
}
);

let lease_id = resp.id();
self.acquire_lock_with_lease(lease_id, etcd_client).await?;
Ok(None) => {
warn!(
"No lock to reuse, try to slow acquire lock, shard_id:{}",
self.shard_id
);
self.slow_acquire_lock(etcd_client).await?
}
Err(e) => {
warn!(
"Failed to fast acquire lock, try to slow acquire lock, shard_id:{}, err:{e}",
self.shard_id
);
self.slow_acquire_lock(etcd_client).await?
}
};

let lease_expired_at = Instant::now() + Duration::from_secs(resp.ttl() as u64);
self.keep_lease_alive(
lease_id,
lease_expired_at,
lease_info.id,
lease_info.expired_at,
on_lock_expired,
etcd_client,
runtime,
Expand Down Expand Up @@ -486,7 +610,7 @@ impl ShardLock {
);
}

async fn acquire_lock_with_lease(&self, lease_id: i64, etcd_client: &mut Client) -> Result<()> {
async fn create_lock_with_lease(&self, lease_id: i64, etcd_client: &mut Client) -> Result<()> {
// In etcd, the version is 0 if the key does not exist.
let not_exist = Compare::version(self.key.clone(), CompareOp::Equal, 0);
let create_key = {
Expand Down Expand Up @@ -645,24 +769,38 @@ impl ShardLock {
}
}

#[derive(Clone, Debug)]
pub struct Config {
pub node_name: String,
pub lock_key_prefix: String,
pub lock_lease_ttl_sec: u64,
pub lock_lease_check_interval: Duration,
pub enable_fast_reacquire_lock: bool,
pub rpc_timeout: Duration,
pub runtime: RuntimeRef,
}

impl ShardLockManager {
pub fn new(
key_prefix: String,
node_name: String,
etcd_client: Client,
lease_ttl_sec: u64,
lock_lease_check_interval: Duration,
rpc_timeout: Duration,
runtime: RuntimeRef,
) -> ShardLockManager {
pub fn new(config: Config, etcd_client: Client) -> ShardLockManager {
let Config {
node_name,
lock_key_prefix,
lock_lease_ttl_sec,
lock_lease_check_interval,
enable_fast_reacquire_lock,
rpc_timeout,
runtime,
} = config;

let value = Bytes::from(ShardLockValue { node_name }.encode_to_vec());

ShardLockManager {
key_prefix,
value,
lease_ttl_sec,
lock_key_prefix,
lock_value: value,
lock_lease_ttl_sec,
lock_lease_check_interval,
rpc_timeout,
enable_fast_reacquire_lock,
etcd_client,
runtime,
shard_locks: Arc::new(AsyncRwLock::new(HashMap::new())),
Expand Down Expand Up @@ -697,10 +835,11 @@ impl ShardLockManager {

let mut shard_lock = ShardLock::new(
shard_id,
&self.key_prefix,
self.value.clone(),
self.lease_ttl_sec,
&self.lock_key_prefix,
self.lock_value.clone(),
self.lock_lease_ttl_sec,
self.lock_lease_check_interval,
self.enable_fast_reacquire_lock,
self.rpc_timeout,
);

Expand Down
5 changes: 3 additions & 2 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::{DateTime, Utc};
use crc::{Crc, CRC_32_ISCSI};
use futures::stream::BoxStream;
use hash_ext::SeaHasherBuilder;
use logger::{debug, info, warn};
use logger::{debug, warn};
use lru::LruCache;
use notifier::notifier::{ExecutionGuard, RequestNotifiers};
use partitioned_lock::PartitionedMutex;
Expand Down Expand Up @@ -643,7 +643,8 @@ impl DiskCacheStore {
continue;
}
};
info!("Disk cache recover_cache, filename:{file_name}, size:{file_size}");

debug!("Disk cache recover_cache, filename:{file_name}, size:{file_size}");
let page_meta = PageMeta { file_size };
cache.insert_page_meta(file_name, page_meta);
}
Expand Down

0 comments on commit 4c66a3f

Please sign in to comment.