From 3bca82ac21fbedfd6489a9ba9b52f1f0d97b4041 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 1 Jun 2023 10:08:39 +0800 Subject: [PATCH] chore: remove duplicated metrics --- .../src/sst/parquet/async_reader.rs | 41 +++++-------------- tools/src/bin/sst-metadata.rs | 20 +++++++-- 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index d2007af71b..2764c93ce9 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -30,7 +30,6 @@ use parquet::{ file::metadata::RowGroupMetaData, }; use parquet_ext::meta_data::ChunkReader; -use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -42,7 +41,6 @@ use crate::sst::{ cache::{MetaCacheRef, MetaData}, SstMetaData, }, - metrics, parquet::{ encoding::ParquetDecoder, meta_data::{ParquetFilter, ParquetMetaDataRef}, @@ -221,10 +219,6 @@ impl<'a> Reader<'a> { let chunks_num = parallelism; let chunk_size = target_row_groups.len() / parallelism; self.metrics.parallelism = parallelism; - debug!( - "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", - suggested_parallelism, parallelism, chunk_size - ); let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() { @@ -236,6 +230,10 @@ impl<'a> Reader<'a> { meta_data.parquet().file_metadata().schema_descr(), row_projector.existed_source_projection().iter().copied(), ); + debug!( + "Reader fetch record batches, parallelism suggest:{}, real:{}, chunk_size:{}, project:{:?}", + suggested_parallelism, parallelism, chunk_size, proj_mask + ); let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { @@ -355,18 +353,12 @@ impl<'a> Reader<'a> { } } -#[derive(Clone, Debug)] -struct ObjectReaderMetrics { - bytes_scanned: usize, - sst_get_range_length_histogram: LocalHistogram, -} - #[derive(Clone)] struct ObjectStoreReader { storage: ObjectStoreRef, path: Path, meta_data: MetaData, - metrics: ObjectReaderMetrics, + begin: Instant, } impl ObjectStoreReader { @@ -375,27 +367,23 @@ impl ObjectStoreReader { storage, path, meta_data, - metrics: ObjectReaderMetrics { - bytes_scanned: 0, - sst_get_range_length_histogram: metrics::SST_GET_RANGE_HISTOGRAM.local(), - }, + begin: Instant::now(), } } } impl Drop for ObjectStoreReader { fn drop(&mut self) { - debug!("ObjectStoreReader is dropped, metrics:{:?}", self.metrics); + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); } } impl AsyncFileReader for ObjectStoreReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.metrics.bytes_scanned += range.end - range.start; - self.metrics - .sst_get_range_length_histogram - .observe((range.end - range.start) as f64); - self.storage .get_range(&self.path, range) .map_err(|e| { @@ -410,13 +398,6 @@ impl AsyncFileReader for ObjectStoreReader { &mut self, ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { - for range in &ranges { - self.metrics.bytes_scanned += range.end - range.start; - self.metrics - .sst_get_range_length_histogram - .observe((range.end - range.start) as f64); - } - async move { self.storage .get_ranges(&self.path, &ranges) diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 96625f1002..9eb81422bb 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -63,6 +63,7 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; + let verbose = args.verbose; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -70,7 +71,7 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size).await?; + parse_metadata(storage, location, object_meta.size, verbose).await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -107,8 +108,8 @@ async fn run(args: Args) -> Result<()> { .unwrap_or(0); let file_metadata = parquet_meta.file_metadata(); let row_num = file_metadata.num_rows(); - if args.verbose { - println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}"); + if verbose { + println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}, custom_meta:{custom_meta:?}"); } else { let size_mb = as_mb(*size); let metadata_mb = as_mb(metadata_size); @@ -131,6 +132,7 @@ async fn parse_metadata( storage: ObjectStoreRef, path: Path, size: usize, + verbose: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; @@ -138,7 +140,17 @@ async fn parse_metadata( let kv_size = kv_metadata .map(|kvs| { kvs.iter() - .map(|kv| kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0)) + .map(|kv| { + if verbose { + println!( + "kv_metadata_size, key:{}, value:{:?}", + kv.key, + kv.value.as_ref().map(|v| v.len()) + ); + } + + kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0) + }) .sum() }) .unwrap_or(0);