Skip to content

Commit

Permalink
feat: Add metrics for RocksDB get_batch implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
netrome committed Oct 29, 2024
1 parent c977c19 commit 3c19632
Showing 1 changed file with 51 additions and 31 deletions.
82 changes: 51 additions & 31 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,41 @@ where
}
}
}

fn register_read<T>(
&self,
result: StorageResult<Option<T>>,
column_id: u32,
) -> StorageResult<Option<T>>
where
T: NumberOfBytes,
{
self.metrics.read_meter.inc();
let column_metrics = self.metrics.columns_read_statistic.get(&column_id);
column_metrics.map(|metric| metric.inc());

if let Ok(Some(value)) = &result {
self.metrics.bytes_read.inc_by(value.number_of_bytes());
};

result
}
}

trait NumberOfBytes {
fn number_of_bytes(&self) -> u64;
}

impl NumberOfBytes for Vec<u8> {
fn number_of_bytes(&self) -> u64 {
self.len() as u64
}
}

impl NumberOfBytes for usize {
fn number_of_bytes(&self) -> u64 {
*self as u64
}
}

pub(crate) struct KeyOnly;
Expand Down Expand Up @@ -713,41 +748,34 @@ where
}

fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
self.metrics.read_meter.inc();
let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
column_metrics.map(|metric| metric.inc());

let value = self
let result = self
.db
.get_cf_opt(&self.cf(column), key, &self.read_options)
.map_err(|e| DatabaseError::Other(e.into()))?;

if let Some(value) = &value {
self.metrics.bytes_read.inc_by(value.len() as u64);
}
.map_err(|e| StorageError::Other(DatabaseError::Other(e.into()).into()));

Ok(value.map(Arc::new))
self.register_read(result, column.id())
.map(|opt| opt.map(Arc::new))
}

fn get_batch<'a>(
&'a self,
keys: BoxedIter<'a, Vec<u8>>,
column: Self::Column,
) -> BoxedIter<'a, StorageResult<Option<Value>>> {
// TODO: Metrics

let column = self.cf(column);
let keys = keys.map(|key| (&column, key));
let column_family = self.cf(column);
let keys = keys.map(|key| (&column_family, key));

self.db
.multi_get_cf_opt(keys, &self.read_options)
.into_iter()
.map(|value_opt| {
value_opt
.map_err(|e| {
.map(move |result| {
self.register_read(
result.map_err(|e| {
StorageError::Other(DatabaseError::Other(e.into()).into())
})
.map(|value| value.map(Arc::new))
}),
column.id(),
)
.map(|opt| opt.map(Arc::new))
})
.into_boxed()
}
Expand All @@ -758,11 +786,7 @@ where
column: Self::Column,
mut buf: &mut [u8],
) -> StorageResult<Option<usize>> {
self.metrics.read_meter.inc();
let column_metrics = self.metrics.columns_read_statistic.get(&column.id());
column_metrics.map(|metric| metric.inc());

let r = self
let result = self
.db
.get_pinned_cf_opt(&self.cf(column), key, &self.read_options)
.map_err(|e| DatabaseError::Other(e.into()))?
Expand All @@ -772,13 +796,9 @@ where
.map_err(|e| DatabaseError::Other(anyhow::anyhow!(e)))?;
StorageResult::Ok(read)
})
.transpose()?;

if let Some(r) = &r {
self.metrics.bytes_read.inc_by(*r as u64);
}
.transpose();

Ok(r)
self.register_read(result, column.id())
}
}

Expand Down

0 comments on commit 3c19632

Please sign in to comment.