Skip to content

Commit

Permalink
chore: remove duplicated metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Jun 1, 2023
1 parent 7692507 commit 9d8a56d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 33 deletions.
41 changes: 11 additions & 30 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use parquet::{
file::metadata::RowGroupMetaData,
};
use parquet_ext::meta_data::ChunkReader;
use prometheus::local::LocalHistogram;
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand All @@ -42,7 +41,6 @@ use crate::sst::{
cache::{MetaCacheRef, MetaData},
SstMetaData,
},
metrics,
parquet::{
encoding::ParquetDecoder,
meta_data::{ParquetFilter, ParquetMetaDataRef},
Expand Down Expand Up @@ -221,10 +219,6 @@ impl<'a> Reader<'a> {
let chunks_num = parallelism;
let chunk_size = target_row_groups.len() / parallelism;
self.metrics.parallelism = parallelism;
debug!(
"Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}",
suggested_parallelism, parallelism, chunk_size
);

let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() {
Expand All @@ -236,6 +230,10 @@ impl<'a> Reader<'a> {
meta_data.parquet().file_metadata().schema_descr(),
row_projector.existed_source_projection().iter().copied(),
);
debug!(
"Reader fetch record batches, parallelism suggest:{}, real:{}, chunk_size:{}, project:{:?}",
suggested_parallelism, parallelism, chunk_size, proj_mask
);

let mut streams = Vec::with_capacity(target_row_group_chunks.len());
for chunk in target_row_group_chunks {
Expand Down Expand Up @@ -355,18 +353,12 @@ impl<'a> Reader<'a> {
}
}

#[derive(Clone, Debug)]
struct ObjectReaderMetrics {
bytes_scanned: usize,
sst_get_range_length_histogram: LocalHistogram,
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
path: Path,
meta_data: MetaData,
metrics: ObjectReaderMetrics,
begin: Instant,
}

impl ObjectStoreReader {
Expand All @@ -375,27 +367,23 @@ impl ObjectStoreReader {
storage,
path,
meta_data,
metrics: ObjectReaderMetrics {
bytes_scanned: 0,
sst_get_range_length_histogram: metrics::SST_GET_RANGE_HISTOGRAM.local(),
},
begin: Instant::now(),
}
}
}

impl Drop for ObjectStoreReader {
fn drop(&mut self) {
debug!("ObjectStoreReader is dropped, metrics:{:?}", self.metrics);
debug!(
"ObjectStoreReader dropped, path:{}, elapsed:{:?}",
&self.path,
self.begin.elapsed()
);
}
}

impl AsyncFileReader for ObjectStoreReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);

self.storage
.get_range(&self.path, range)
.map_err(|e| {
Expand All @@ -410,13 +398,6 @@ impl AsyncFileReader for ObjectStoreReader {
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
for range in &ranges {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
}

async move {
self.storage
.get_ranges(&self.path, &ranges)
Expand Down
20 changes: 17 additions & 3 deletions tools/src/bin/sst-metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ async fn run(args: Args) -> Result<()> {

let mut join_set = JoinSet::new();
let mut ssts = storage.list(None).await?;
let verbose = args.verbose;
while let Some(object_meta) = ssts.next().await {
let object_meta = object_meta?;
let storage = storage.clone();
let location = object_meta.location.clone();
join_set.spawn_on(
async move {
let (metadata, metadata_size, kv_size) =
parse_metadata(storage, location, object_meta.size).await?;
parse_metadata(storage, location, object_meta.size, verbose).await?;
Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size))
},
&handle,
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn run(args: Args) -> Result<()> {
.unwrap_or(0);
let file_metadata = parquet_meta.file_metadata();
let row_num = file_metadata.num_rows();
if args.verbose {
if verbose {
println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}");
} else {
let size_mb = as_mb(*size);
Expand All @@ -131,18 +132,31 @@ async fn parse_metadata(
storage: ObjectStoreRef,
path: Path,
size: usize,
verbose: bool,
) -> Result<(MetaData, usize, usize)> {
let reader = ChunkReaderAdapter::new(&path, &storage);
let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?;
let kv_metadata = parquet_metadata.file_metadata().key_value_metadata();
let kv_size = kv_metadata
.map(|kvs| {
kvs.iter()
.map(|kv| kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0))
.map(|kv| {
if verbose {
println!(
"kv_metadata_size, key:{}, value:{:?}",
kv.key,
kv.value.as_ref().map(|v| v.len())
);
}

kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0)
})
.sum()
})
.unwrap_or(0);

let md = MetaData::try_new(&parquet_metadata, false)?;
println!("custom:{:?}", md.custom());

Ok((md, metadata_size, kv_size))
}

0 comments on commit 9d8a56d

Please sign in to comment.