Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support query opened shards info #1070

Merged
merged 4 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ impl Inner {
msg: format!("close non-existent shard, shard_id:{shard_id}"),
})
}

fn list_shards(&self) -> Vec<ShardInfo> {
let shards = self.shard_set.all_shards();
let mut shard_infos = Vec::with_capacity(shards.len());
for shard in shards {
let shard_info = shard.shard_info();
shard_infos.push(shard_info);
}
shard_infos
}
}

#[async_trait]
Expand Down Expand Up @@ -345,6 +355,10 @@ impl Cluster for ClusterImpl {
self.inner.close_shard(shard_id)
}

fn list_shards(&self) -> Vec<ShardInfo> {
self.inner.list_shards()
}

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse> {
self.inner.route_tables(req).await
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ pub trait Cluster {
/// Return error if the shard is not found.
async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef>;

/// list shards
fn list_shards(&self) -> Vec<ShardInfo>;

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse>;
async fn fetch_nodes(&self) -> Result<ClusterNodesResp>;
fn shard_lock_manager(&self) -> ShardLockManagerRef;
Expand Down
4 changes: 4 additions & 0 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ mod tests {
unimplemented!();
}

fn list_shards(&self) -> Vec<ShardInfo> {
unimplemented!();
}

async fn route_tables(
&self,
req: &RouteTablesRequest,
Expand Down
59 changes: 59 additions & 0 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::{
};

use analytic_engine::setup::OpenedWals;
use cluster::ClusterRef;
use common_types::bytes::Bytes;
use common_util::{
error::{BoxError, GenericError},
runtime::Runtime,
};
use log::{error, info};
use logger::RuntimeLevel;
use meta_client::types::ShardRole;
use profile::Profiler;
use prom_remote_api::web;
use proxy::{
Expand Down Expand Up @@ -118,6 +120,9 @@ pub enum Error {

#[snafu(display("{msg}"))]
QueryMaybeExceedTTL { msg: String },

#[snafu(display("query shards only supported in cluster mode"))]
NoCluster {},
baojinri marked this conversation as resolved.
Show resolved Hide resolved
}

define_result!(Error);
Expand All @@ -129,6 +134,7 @@ impl reject::Reject for Error {}
/// Endpoints beginning with /debug are for internal use, and may subject to
/// breaking changes.
pub struct Service<Q> {
cluster: Option<ClusterRef>,
proxy: Arc<Proxy<Q>>,
engine_runtimes: Arc<EngineRuntimes>,
log_runtime: Arc<RuntimeLevel>,
Expand Down Expand Up @@ -198,6 +204,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.profile_cpu())
.or(self.profile_heap())
.or(self.server_config())
.or(self.shards())
.or(self.stats())
.with(warp::log("http_requests"))
.with(warp::log::custom(|info| {
Expand Down Expand Up @@ -492,6 +499,41 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.map(move || server_config_content.clone())
}

// GET /debug/shards
fn shards(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "shards")
.and(warp::get())
.and(self.with_cluster())
.and_then(|cluster: Option<ClusterRef>| async move {
let cluster = match cluster {
Some(cluster) => cluster,
None => return Err(reject::custom(Error::NoCluster {})),
};
let shard_infos = cluster.list_shards();

let mut shards = Vec::new();
for shard_info in shard_infos {
let mut shard = HashMap::new();
shard.insert("shard id", shard_info.id.to_string());
shard.insert(
"shard role",
match shard_info.role {
ShardRole::Leader => "Leader",
ShardRole::Follower => "Follower",
ShardRole::PendingFollower => "PendingFollower",
ShardRole::PendingLeader => "PendingLeader",
}
.to_string(),
);
shard.insert("shard version", shard_info.version.to_string());
shards.push(shard);
}
baojinri marked this conversation as resolved.
Show resolved Hide resolved
Ok(reply::json(&shards))
})
}

// GET /debug/stats
fn stats(&self) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
let opened_wals = self.opened_wals.clone();
Expand Down Expand Up @@ -605,6 +647,13 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::any().map(move || proxy.clone())
}

fn with_cluster(
&self,
) -> impl Filter<Extract = (Option<ClusterRef>,), Error = Infallible> + Clone {
let cluster = self.cluster.clone();
warp::any().map(move || cluster.clone())
}

fn with_runtime(&self) -> impl Filter<Extract = (Arc<Runtime>,), Error = Infallible> + Clone {
let runtime = self.engine_runtimes.default_runtime.clone();
warp::any().map(move || runtime.clone())
Expand All @@ -631,6 +680,7 @@ pub struct Builder<Q> {
engine_runtimes: Option<Arc<EngineRuntimes>>,
log_runtime: Option<Arc<RuntimeLevel>>,
config_content: Option<String>,
cluster: Option<ClusterRef>,
proxy: Option<Arc<Proxy<Q>>>,
opened_wals: Option<OpenedWals>,
}
Expand All @@ -642,6 +692,7 @@ impl<Q> Builder<Q> {
engine_runtimes: None,
log_runtime: None,
config_content: None,
cluster: None,
proxy: None,
opened_wals: None,
}
Expand All @@ -662,6 +713,11 @@ impl<Q> Builder<Q> {
self
}

pub fn cluster(mut self, cluster: Option<ClusterRef>) -> Self {
self.cluster = cluster;
self
}

pub fn proxy(mut self, proxy: Arc<Proxy<Q>>) -> Self {
self.proxy = Some(proxy);
self
Expand All @@ -680,11 +736,13 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
let config_content = self.config_content.context(MissingInstance)?;
let proxy = self.proxy.context(MissingProxy)?;
let cluster = self.cluster;
let opened_wals = self.opened_wals.context(MissingWal)?;

let (tx, rx) = oneshot::channel();

let service = Service {
cluster,
proxy,
engine_runtimes,
log_runtime,
Expand Down Expand Up @@ -732,6 +790,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::AlreadyStarted { .. }
| Error::MissingRouter { .. }
| Error::MissingWal { .. }
| Error::NoCluster { .. }
baojinri marked this conversation as resolved.
Show resolved Hide resolved
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
Error::QueryMaybeExceedTTL { .. } => StatusCode::OK,
}
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.engine_runtimes(engine_runtimes.clone())
.log_runtime(log_runtime)
.config_content(config_content)
.cluster(self.cluster.clone())
.proxy(proxy.clone())
.opened_wals(opened_wals.clone())
.build()
Expand Down