Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
hrana: add diagnostics for connections (#729)
Browse files Browse the repository at this point in the history
* hrana: add diagnostics for connections

This commit adds a /v2/diagnostics endpoint which prints
various information about current hrana-over-http connections.

Draft, because the diagnostics are currently in a very debuggy
format, and I'm figuring out if we can make it more human-readable.
Still, they're enough to determine if something is holding a lock
via an abandoned hrana-over-http stream.

Example:
```
$ curl -s http://localhost:8080/v2/diagnostics | jq
[
  "expired",
  "expired",
  "expired",
  "expired",
  "expired",
  "(conn: Mutex { data: <locked> }, timeout_ms: 872, stolen: false)",
  "(conn: Mutex { data: <locked> }, timeout_ms: 0, stolen: true)"
]
```

* apply review suggestions: no more Debug required in WalHook

* apply review fixes: move everything to admin api

* Update sqld/src/http/admin/mod.rs

Co-authored-by: ad hoc <[email protected]>

* fix Json return type

* revert leftover change - returning from passive checkpoints

---------

Co-authored-by: ad hoc <[email protected]>
  • Loading branch information
psarna and MarinPostma authored Oct 5, 2023
1 parent 44e528d commit 7b5b4f2
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 14 deletions.
1 change: 1 addition & 0 deletions sqld-libsql-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn get_orig_wal_methods() -> anyhow::Result<*mut libsql_wal_methods> {
Ok(orig)
}

#[derive(Debug)]
pub struct Connection<W: WalHook> {
conn: rusqlite::Connection,
// Safety: _ctx MUST be dropped after the connection, because the connection has a pointer
Expand Down
1 change: 1 addition & 0 deletions sqld-libsql-bindings/src/wal_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub unsafe trait WalHook {
init_static_wal_method!(TRANSPARENT_METHODS, TransparentMethods);

/// Wal implemementation that just proxies calls to the wrapped WAL methods implementation
#[derive(Debug)]
pub enum TransparentMethods {}

unsafe impl WalHook for TransparentMethods {
Expand Down
45 changes: 45 additions & 0 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ pub struct LibSqlConnection<W: WalHook> {
inner: Arc<Mutex<Connection<W>>>,
}

impl<W: WalHook> std::fmt::Debug for LibSqlConnection<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.inner.try_lock() {
Some(conn) => {
write!(f, "{conn:?}")
}
None => write!(f, "<locked>"),
}
}
}

pub fn open_conn<W>(
path: &Path,
wal_methods: &'static WalMethodsHook<W>,
Expand Down Expand Up @@ -218,6 +229,14 @@ struct Connection<W: WalHook = TransparentMethods> {
slot: Option<Arc<TxnSlot<W>>>,
}

impl<W: WalHook> std::fmt::Debug for Connection<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("slot", &self.slot)
.finish()
}
}

/// A slot for holding the state of a transaction lock permit
struct TxnSlot<T: WalHook> {
/// Pointer to the connection holding the lock. Used to rollback the transaction when the lock
Expand All @@ -229,7 +248,23 @@ struct TxnSlot<T: WalHook> {
is_stolen: AtomicBool,
}

impl<T: WalHook> std::fmt::Debug for TxnSlot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stolen = self.is_stolen.load(Ordering::Relaxed);
let time_left = self
.timeout_at
.duration_since(tokio::time::Instant::now())
.as_millis();
write!(
f,
"(conn: {:?}, timeout_ms: {time_left}, stolen: {stolen})",
self.conn
)
}
}

/// The transaction state shared among all connections to the same database
#[derive(Debug)]
pub struct TxnState<T: WalHook> {
/// Slot for the connection currently holding the transaction lock
slot: RwLock<Option<Arc<TxnSlot<T>>>>,
Expand Down Expand Up @@ -711,6 +746,16 @@ where
.unwrap()?;
Ok(())
}

fn diagnostics(&self) -> String {
match self.inner.try_lock() {
Some(conn) => match conn.slot {
Some(ref slot) => format!("{slot:?}"),
None => "<no-transaction>".to_string(),
},
None => "[BUG] connection busy".to_string(),
}
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion sqld/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub trait Connection: Send + Sync + 'static {

/// Calls for database checkpoint (if supported).
async fn checkpoint(&self) -> Result<()>;

fn diagnostics(&self) -> String;
}

fn make_batch_program(batch: Vec<Query>) -> Vec<Step> {
Expand Down Expand Up @@ -290,7 +292,7 @@ pub struct TrackedConnection<DB> {
atime: AtomicU64,
}

impl<DB> TrackedConnection<DB> {
impl<DB: Connection> TrackedConnection<DB> {
pub fn idle_time(&self) -> Duration {
let now = now_millis();
let atime = self.atime.load(Ordering::Relaxed);
Expand Down Expand Up @@ -335,12 +337,18 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
self.atime.store(now_millis(), Ordering::Relaxed);
self.inner.checkpoint().await
}

#[inline]
fn diagnostics(&self) -> String {
self.inner.diagnostics()
}
}

#[cfg(test)]
mod test {
use super::*;

#[derive(Debug)]
struct DummyDb;

#[async_trait::async_trait]
Expand Down Expand Up @@ -371,6 +379,10 @@ mod test {
async fn checkpoint(&self) -> Result<()> {
unreachable!()
}

fn diagnostics(&self) -> String {
"dummy".into()
}
}

#[tokio::test]
Expand Down
5 changes: 5 additions & 0 deletions sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl MakeConnection for MakeWriteProxyConn {
}
}

#[derive(Debug)]
pub struct WriteProxyConnection {
/// Lazily initialized read connection
read_conn: LibSqlConnection<TransparentMethods>,
Expand Down Expand Up @@ -316,6 +317,10 @@ impl Connection for WriteProxyConnection {
self.wait_replication_sync(None).await?;
self.read_conn.checkpoint().await
}

fn diagnostics(&self) -> String {
format!("{:?}", self.state)
}
}

impl Drop for WriteProxyConnection {
Expand Down
6 changes: 5 additions & 1 deletion sqld/src/hrana/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::connection::{Connection, MakeConnection};
mod proto;
mod protobuf;
mod request;
mod stream;
pub(crate) mod stream;

pub struct Server<C> {
self_url: Option<String>,
Expand Down Expand Up @@ -69,6 +69,10 @@ impl<C: Connection> Server<C> {
})
.or_else(|err| err.downcast::<ProtocolError>().map(protocol_error_response))
}

pub(crate) fn stream_state(&self) -> &Mutex<stream::ServerStreamState<C>> {
&self.stream_state
}
}

pub(crate) async fn handle_index() -> hyper::Response<hyper::Body> {
Expand Down
12 changes: 9 additions & 3 deletions sqld/src/hrana/http/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub struct ServerStreamState<D> {
}

/// Handle to a stream, owned by the [`ServerStreamState`].
enum Handle<D> {
#[derive(Debug)]
pub(crate) enum Handle<D> {
/// A stream that is open and ready to be used by requests. [`Stream::db`] should always be
/// `Some`.
Available(Box<Stream<D>>),
Expand All @@ -51,10 +52,11 @@ enum Handle<D> {
///
/// The stream is either owned by [`Handle::Available`] (when it's not in use) or by [`Guard`]
/// (when it's being used by a request).
struct Stream<D> {
#[derive(Debug)]
pub(crate) struct Stream<D> {
/// The database connection that corresponds to this stream. This is `None` after the `"close"`
/// request was executed.
db: Option<Arc<D>>,
pub(crate) db: Option<Arc<D>>,
/// The cache of SQL texts stored on the server with `"store_sql"` requests.
sqls: HashMap<i32, String>,
/// Stream id of this stream. The id is generated randomly (it should be unguessable).
Expand Down Expand Up @@ -98,6 +100,10 @@ impl<D> ServerStreamState<D> {
expire_round_base: Instant::now(),
}
}

pub(crate) fn handles(&self) -> &HashMap<u64, Handle<D>> {
&self.handles
}
}

/// Acquire a guard to a new or existing stream. If baton is `Some`, we try to look up the stream,
Expand Down
45 changes: 43 additions & 2 deletions sqld/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ use tokio_util::io::ReaderStream;
use url::Url;

use crate::connection::config::DatabaseConfig;
use crate::database::Database;
use crate::error::LoadDumpError;
use crate::hrana;
use crate::namespace::{DumpStream, MakeNamespace, NamespaceName, NamespaceStore, RestoreOption};

pub mod stats;

type UserHttpServer<M> =
Arc<hrana::http::Server<<<M as MakeNamespace>::Database as Database>::Connection>>;

struct AppState<M: MakeNamespace> {
namespaces: NamespaceStore<M>,
user_http_server: UserHttpServer<M>,
}

pub async fn run<M, A>(acceptor: A, namespaces: NamespaceStore<M>) -> anyhow::Result<()>
pub async fn run<M, A>(
acceptor: A,
user_http_server: UserHttpServer<M>,
namespaces: NamespaceStore<M>,
) -> anyhow::Result<()>
where
A: crate::net::Accept,
M: MakeNamespace,
Expand All @@ -43,7 +53,11 @@ where
)
.route("/v1/namespaces/:namespace", delete(handle_delete_namespace))
.route("/v1/namespaces/:namespace/stats", get(stats::handle_stats))
.with_state(Arc::new(AppState { namespaces }));
.route("/v1/diagnostics", get(handle_diagnostics))
.with_state(Arc::new(AppState {
namespaces,
user_http_server,
}));

hyper::server::Server::builder(acceptor)
.serve(router.into_make_service())
Expand All @@ -67,6 +81,33 @@ async fn handle_get_config<M: MakeNamespace>(
Ok(Json(store.get()))
}

async fn handle_diagnostics<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
) -> crate::Result<Json<Vec<String>>> {
use crate::connection::Connection;
use hrana::http::stream;

let server = app_state.user_http_server.as_ref();
let stream_state = server.stream_state().lock();
let handles = stream_state.handles();
let mut diagnostics: Vec<String> = Vec::with_capacity(handles.len());
for handle in handles.values() {
let handle_info: String = match handle {
stream::Handle::Available(stream) => match &stream.db {
Some(db) => db.diagnostics(),
None => "[BUG] available-but-closed".into(),
},
stream::Handle::Acquired => "acquired".into(),
stream::Handle::Expired => "expired".into(),
};
diagnostics.push(handle_info);
}
drop(stream_state);

tracing::trace!("diagnostics: {diagnostics:?}");
Ok(Json(diagnostics))
}

#[derive(Debug, Deserialize)]
struct BlockReq {
block_reads: bool,
Expand Down
8 changes: 6 additions & 2 deletions sqld/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ where
P: Proxy,
S: ReplicationLog,
{
pub fn configure(self, join_set: &mut JoinSet<anyhow::Result<()>>) {
pub fn configure(
self,
join_set: &mut JoinSet<anyhow::Result<()>>,
) -> Arc<hrana::http::Server<<<M as MakeNamespace>::Database as Database>::Connection>> {
let (hrana_accept_tx, hrana_accept_rx) = mpsc::channel(8);
let (hrana_upgrade_tx, hrana_upgrade_rx) = mpsc::channel(8);
let hrana_http_srv = Arc::new(hrana::http::Server::new(self.self_url.clone()));
Expand Down Expand Up @@ -283,7 +286,7 @@ where
let state = AppState {
auth: self.auth,
upgrade_tx: hrana_upgrade_tx,
hrana_http_srv,
hrana_http_srv: hrana_http_srv.clone(),
enable_console: self.enable_console,
namespaces: self.namespaces,
disable_default_namespace: self.disable_default_namespace,
Expand Down Expand Up @@ -418,6 +421,7 @@ where
Ok(())
});
}
hrana_http_srv
}
}

Expand Down
8 changes: 6 additions & 2 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,14 @@ where
path: self.path.clone(),
};

user_http.configure(join_set);
let user_http_service = user_http.configure(join_set);

if let Some(AdminApiConfig { acceptor }) = self.admin_api_config {
join_set.spawn(http::admin::run(acceptor, self.namespaces));
join_set.spawn(http::admin::run(
acceptor,
user_http_service,
self.namespaces,
));
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ impl Version {
}
}

#[derive(Debug)]
pub enum ReplicationLoggerHook {}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ReplicationLoggerHookCtx {
buffer: Vec<WalPage>,
logger: Arc<ReplicationLogger>,
Expand Down Expand Up @@ -276,7 +277,7 @@ unsafe impl WalHook for ReplicationLoggerHook {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct WalPage {
pub page_no: u32,
/// 0 for non-commit frames
Expand Down Expand Up @@ -731,6 +732,7 @@ impl LogFileHeader {
}
}

#[derive(Debug)]
pub struct Generation {
pub id: Uuid,
pub start_index: u64,
Expand All @@ -745,6 +747,7 @@ impl Generation {
}
}

#[derive(Debug)]
pub struct ReplicationLogger {
pub generation: Generation,
pub log_file: RwLock<LogFile>,
Expand Down
9 changes: 9 additions & 0 deletions sqld/src/replication/replica/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ init_static_wal_method!(INJECTOR_METHODS, InjectorHook);
/// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call
/// to xFrames from the libsql connection (see dummy write in `injector`), and can then collect the
/// result on the injection with `take_result`
#[derive(Debug)]
pub enum InjectorHook {}

pub struct InjectorHookCtx {
Expand All @@ -83,6 +84,14 @@ pub struct InjectorHookCtx {
post_commit: Box<dyn Fn(FrameNo) -> anyhow::Result<()>>,
}

impl std::fmt::Debug for InjectorHookCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InjectorHookCtx")
.field("is_txn", &self.is_txn)
.finish()
}
}

impl InjectorHookCtx {
pub fn new(
receiver: tokio::sync::mpsc::Receiver<Frames>,
Expand Down
Loading

0 comments on commit 7b5b4f2

Please sign in to comment.