diff --git a/Cargo.lock b/Cargo.lock index 3235f2a230..ef8b1d4b6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,9 +1303,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea8a61d72d30452b689e761344d9502bcc5feb2dbd06f08b507b2e164b549aee" +checksum = "963a7021095c631e843df0afa20de53cd33cb1ac08f99e633825e1ca58496ac1" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index 048bdcc72b..4280f9ea1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.10" +ceresdbproto = "1.0.11" codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 923014394d..ad662bdffb 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -873,6 +873,7 @@ mod tests { row_num: 0, max_seq: 0, storage_format: StorageFormat::default(), + associated_files: Vec::new(), }; let queue = FilePurgeQueue::new(1, 1.into(), tx.clone()); FileHandle::new(file_meta, queue) @@ -893,6 +894,7 @@ mod tests { row_num: 0, max_seq, storage_format: StorageFormat::default(), + associated_files: Vec::new(), }; let queue = FilePurgeQueue::new(1, 1.into(), tx.clone()); FileHandle::new(file_meta, queue) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 9110b4d392..11fd2d2374 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -496,7 +496,6 @@ impl FlushTask { .context(AllocFileId)?; let sst_file_path = self.table_data.set_sst_file_path(file_id); - // TODO: `min_key` & `max_key` should be figured out when writing sst. let sst_meta = MetaData { min_key: min_key.clone(), @@ -583,6 +582,7 @@ impl FlushTask { time_range: sst_meta.time_range, max_seq: sst_meta.max_sequence, storage_format: sst_info.storage_format, + associated_files: vec![sst_info.meta_path], }, }) } @@ -621,7 +621,6 @@ impl FlushTask { .context(AllocFileId)?; let sst_file_path = self.table_data.set_sst_file_path(file_id); - let storage_format_hint = self.table_data.table_options().storage_format_hint; let sst_write_options = SstWriteOptions { storage_format_hint, @@ -665,6 +664,7 @@ impl FlushTask { time_range: memtable_state.time_range, max_seq: memtable_state.last_sequence(), storage_format: sst_info.storage_format, + associated_files: vec![sst_info.meta_path], })) } } @@ -851,6 +851,7 @@ impl SpaceStore { .fetch_metas(&input.files) .await .context(ReadSstMeta)?; + MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema) }; @@ -861,7 +862,6 @@ impl SpaceStore { .context(AllocFileId)?; let sst_file_path = table_data.set_sst_file_path(file_id); - let mut sst_writer = self .sst_factory .create_writer( @@ -926,6 +926,7 @@ impl SpaceStore { max_seq: sst_meta.max_sequence, time_range: sst_meta.time_range, storage_format: sst_info.storage_format, + associated_files: vec![sst_info.meta_path], }, }); diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 784d628285..52e7f7b969 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -24,16 +24,18 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, + time::Duration, }; use common_types::{ time::{TimeRange, Timestamp}, SequenceNumber, }; +use future_ext::{retry_async, RetryConfig}; use log::{error, info, warn}; use macros::define_result; use metric_ext::Meter; -use object_store::ObjectStoreRef; +use object_store::{ObjectStoreRef, Path}; use runtime::{JoinHandle, Runtime}; use snafu::{ResultExt, Snafu}; use table_engine::table::TableId; @@ -303,7 +305,7 @@ impl Drop for FileHandleInner { info!("FileHandle is dropped, meta:{:?}", self.meta); // Push file cannot block or be async because we are in drop(). - self.purge_queue.push_file(self.meta.id); + self.purge_queue.push_file(&self.meta); } } @@ -441,6 +443,8 @@ pub struct FileMeta { pub max_seq: u64, /// The format of the file. pub storage_format: StorageFormat, + /// Associated files, such as: meta_path + pub associated_files: Vec, } impl FileMeta { @@ -475,9 +479,9 @@ impl FilePurgeQueue { self.inner.closed.store(true, Ordering::SeqCst); } - fn push_file(&self, file_id: FileId) { + fn push_file(&self, file_meta: &FileMeta) { if self.inner.closed.load(Ordering::SeqCst) { - warn!("Purger closed, ignore file_id:{file_id}"); + warn!("Purger closed, ignore file_id:{}", file_meta.id); return; } @@ -486,7 +490,8 @@ impl FilePurgeQueue { let request = FilePurgeRequest { space_id: self.inner.space_id, table_id: self.inner.table_id, - file_id, + file_id: file_meta.id, + associated_files: file_meta.associated_files.clone(), }; if let Err(send_res) = self.inner.sender.send(Request::Purge(request)) { @@ -510,6 +515,7 @@ pub struct FilePurgeRequest { space_id: SpaceId, table_id: TableId, file_id: FileId, + associated_files: Vec, } #[derive(Debug)] @@ -525,6 +531,11 @@ pub struct FilePurger { } impl FilePurger { + const RETRY_CONFIG: RetryConfig = RetryConfig { + max_retries: 3, + interval: Duration::from_millis(500), + }; + pub fn start(runtime: &Runtime, store: ObjectStoreRef) -> Self { // We must use unbound channel, so the sender wont block when the handle is // dropped. @@ -561,6 +572,13 @@ impl FilePurger { FilePurgeQueue::new(space_id, table_id, self.sender.clone()) } + // TODO: currently we ignore errors when delete. + async fn delete_file(store: &ObjectStoreRef, path: &Path) { + if let Err(e) = retry_async(|| store.delete(path), &Self::RETRY_CONFIG).await { + error!("File purger failed to delete file, path:{path}, err:{e}"); + } + } + async fn purge_file_loop(store: ObjectStoreRef, mut receiver: UnboundedReceiver) { info!("File purger start"); @@ -579,13 +597,12 @@ impl FilePurger { sst_file_path.to_string() ); - if let Err(e) = store.delete(&sst_file_path).await { - error!( - "File purger failed to delete file, sst_file_path:{}, err:{}", - sst_file_path.to_string(), - e - ); + for path in purge_request.associated_files { + let path = Path::from(path); + Self::delete_file(&store, &path).await; } + + Self::delete_file(&store, &sst_file_path).await; } Request::Exit => break, } diff --git a/analytic_engine/src/sst/manager.rs b/analytic_engine/src/sst/manager.rs index ca3b2f6662..a0579db24b 100644 --- a/analytic_engine/src/sst/manager.rs +++ b/analytic_engine/src/sst/manager.rs @@ -162,6 +162,7 @@ pub mod tests { time_range: sst_meta.time_range(), max_seq: sst_meta.max_sequence(), storage_format: StorageFormat::Columnar, + associated_files: Vec::new(), }, ); } diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index f319d61dda..0f2cc9d37c 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -18,11 +18,15 @@ use std::{ }; use lru::LruCache; -use parquet::file::metadata::FileMetaData; -use snafu::{ensure, OptionExt, ResultExt}; +use object_store::{ObjectStoreRef, Path}; +use parquet::{file::metadata::FileMetaData, format::KeyValue}; +use snafu::{ensure, OptionExt}; use crate::sst::{ - meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result}, + meta_data::{ + metadata_reader::parse_metadata, KvMetaDataNotFound, KvMetaVersionEmpty, + ParquetMetaDataRef, Result, + }, parquet::encoding, }; @@ -45,9 +49,10 @@ impl MetaData { /// contains no extended custom information. // TODO: remove it and use the suggested api. #[allow(deprecated)] - pub fn try_new( + pub async fn try_new( parquet_meta_data: &parquet_ext::ParquetMetaData, ignore_sst_filter: bool, + store: ObjectStoreRef, ) -> Result { let file_meta_data = parquet_meta_data.file_metadata(); let kv_metas = file_meta_data @@ -55,28 +60,32 @@ impl MetaData { .context(KvMetaDataNotFound)?; ensure!(!kv_metas.is_empty(), KvMetaDataNotFound); - let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1); + + let mut meta_path = None; + let mut other_kv_metas: Vec = Vec::with_capacity(kv_metas.len() - 1); let mut custom_kv_meta = None; + let mut meta_version = encoding::META_VERSION_V1; // default is v1 + for kv_meta in kv_metas { - // Remove our extended custom meta data from the parquet metadata for small - // memory consumption in the cache. if kv_meta.key == encoding::META_KEY { custom_kv_meta = Some(kv_meta); + } else if kv_meta.key == encoding::META_PATH_KEY { + meta_path = kv_meta.value.as_ref().map(|path| Path::from(path.as_str())) + } else if kv_meta.key == encoding::META_VERSION_KEY { + meta_version = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?; } else { other_kv_metas.push(kv_meta.clone()); } } - let custom = { - let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?; - let mut sst_meta = - encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?; - if ignore_sst_filter { - sst_meta.parquet_filter = None; - } - - Arc::new(sst_meta) - }; + let custom = parse_metadata( + meta_version, + custom_kv_meta, + ignore_sst_filter, + meta_path.clone(), + store, + ) + .await?; // let's build a new parquet metadata without the extended key value // metadata. @@ -103,7 +112,6 @@ impl MetaData { Arc::new(thin_parquet_meta_data) }; - Ok(Self { parquet, custom }) } @@ -155,6 +163,7 @@ mod tests { schema::Builder as CustomSchemaBuilder, time::{TimeRange, Timestamp}, }; + use object_store::LocalFileSystem; use parquet::{arrow::ArrowWriter, file::footer}; use parquet_ext::ParquetMetaData; @@ -238,15 +247,16 @@ mod tests { .unwrap(); let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); - let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap(); + let encoded_meta_data = + encoding::encode_sst_meta_data_v1(custom_meta_data.clone()).unwrap(); writer.append_key_value_metadata(encoded_meta_data); writer.write(&batch).unwrap(); writer.close().unwrap(); } - #[test] - fn test_arrow_meta_data() { + #[tokio::test] + async fn test_arrow_meta_data() { let temp_dir = tempfile::tempdir().unwrap(); let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par"); let schema = { @@ -284,8 +294,11 @@ mod tests { let parquet_file = File::open(parquet_file_path.as_path()).unwrap(); let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap(); - - let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap(); + let store = + Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap()); + let meta_data = MetaData::try_new(&parquet_meta_data, false, store) + .await + .unwrap(); assert_eq!(**meta_data.custom(), custom_meta_data); check_parquet_meta_data(&parquet_meta_data, meta_data.parquet()); diff --git a/analytic_engine/src/sst/meta_data/metadata_reader.rs b/analytic_engine/src/sst/meta_data/metadata_reader.rs new file mode 100644 index 0000000000..017fae6c7d --- /dev/null +++ b/analytic_engine/src/sst/meta_data/metadata_reader.rs @@ -0,0 +1,126 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use macros::define_result; +use object_store::{ObjectStoreRef, Path}; +use parquet::{data_type::AsBytes, file::metadata::KeyValue}; +use snafu::{ensure, OptionExt, ResultExt}; + +use super::UnknownMetaVersion; +use crate::sst::{ + meta_data::{ + DecodeCustomMetaData, FetchAndDecodeSstMeta, FetchFromStore, KvMetaDataNotFound, + KvMetaPathEmpty, + }, + parquet::{ + encoding::{self, decode_sst_meta_data_v2, META_VERSION_CURRENT, META_VERSION_V1}, + meta_data::{ParquetMetaData, ParquetMetaDataRef}, + }, +}; + +define_result!(super::Error); + +#[async_trait] +pub trait CustomMetadataReader { + async fn get_metadata(&self) -> Result; +} + +pub struct MetaV1Reader<'a> { + custom_kv_meta: Option<&'a KeyValue>, +} + +impl<'a> MetaV1Reader<'a> { + fn new(custom_kv_meta: Option<&'a KeyValue>) -> Self { + Self { custom_kv_meta } + } +} + +#[async_trait] +impl CustomMetadataReader for MetaV1Reader<'_> { + async fn get_metadata(&self) -> Result { + let custom_kv_meta = self.custom_kv_meta.context(KvMetaDataNotFound)?; + + encoding::decode_sst_meta_data_v1(custom_kv_meta).context(DecodeCustomMetaData) + } +} + +pub struct MetaV2Reader { + meta_path: Option, + store: ObjectStoreRef, +} + +impl MetaV2Reader { + fn new(meta_path: Option, store: ObjectStoreRef) -> Self { + Self { meta_path, store } + } +} + +#[async_trait] +impl CustomMetadataReader for MetaV2Reader { + async fn get_metadata(&self) -> Result { + match &self.meta_path { + None => KvMetaPathEmpty {}.fail(), + Some(meta_path) => { + let metadata = self + .store + .get(meta_path) + .await + .with_context(|| FetchFromStore { + file_path: meta_path.to_string(), + })? + .bytes() + .await + .with_context(|| FetchAndDecodeSstMeta { + file_path: meta_path.to_string(), + })?; + + decode_sst_meta_data_v2(metadata.as_bytes()).context(DecodeCustomMetaData) + } + } + } +} + +pub async fn parse_metadata( + meta_version: &str, + custom_kv_meta: Option<&KeyValue>, + ignore_sst_filter: bool, + meta_path: Option, + store: ObjectStoreRef, +) -> Result { + // Must ensure custom metadata only store in one place + ensure!( + custom_kv_meta.is_none() || meta_path.is_none(), + KvMetaDataNotFound + ); + + let reader: Box = match meta_version { + META_VERSION_V1 => Box::new(MetaV1Reader::new(custom_kv_meta)), + META_VERSION_CURRENT => Box::new(MetaV2Reader::new(meta_path, store)), + _ => { + return UnknownMetaVersion { + version: meta_version, + } + .fail() + } + }; + let mut metadata = reader.get_metadata().await?; + if ignore_sst_filter { + metadata.parquet_filter = None; + } + + Ok(Arc::new(metadata)) +} diff --git a/analytic_engine/src/sst/meta_data/mod.rs b/analytic_engine/src/sst/meta_data/mod.rs index 010eb033f2..ca98c622eb 100644 --- a/analytic_engine/src/sst/meta_data/mod.rs +++ b/analytic_engine/src/sst/meta_data/mod.rs @@ -13,8 +13,9 @@ // limitations under the License. pub mod cache; +mod metadata_reader; -use std::sync::Arc; +use std::{str::Utf8Error, sync::Arc}; use ceresdbproto::sst as sst_pb; use common_types::{schema::Schema, time::TimeRange, SequenceNumber}; @@ -47,6 +48,21 @@ pub enum Error { ))] KvMetaDataNotFound { backtrace: Backtrace }, + #[snafu(display( + "Key value meta version in parquet is empty\nBacktrace\n:{}", + backtrace + ))] + KvMetaVersionEmpty { backtrace: Backtrace }, + + #[snafu(display("Key value meta path in parquet is empty\nBacktrace\n:{}", backtrace))] + KvMetaPathEmpty { backtrace: Backtrace }, + + #[snafu(display("Unknown mata version, value:{}.\nBacktrace\n:{}", version, backtrace))] + UnknownMetaVersion { + version: String, + backtrace: Backtrace, + }, + #[snafu(display("Metadata in proto struct is not found.\nBacktrace\n:{}", backtrace))] MetaDataNotFound { backtrace: Backtrace }, @@ -64,6 +80,36 @@ pub enum Error { #[snafu(display("Failed to convert parquet meta data, err:{}", source))] ConvertParquetMetaData { source: parquet::meta_data::Error }, + + #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] + ObjectStoreError { + source: object_store::ObjectStoreError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + ))] + FetchAndDecodeSstMeta { + file_path: String, + source: object_store::ObjectStoreError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + ))] + FetchFromStore { + file_path: String, + source: object_store::ObjectStoreError, + backtrace: Backtrace, + }, + + #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] + Utf8ErrorWrapper { + source: Utf8Error, + backtrace: Backtrace, + }, } define_result!(Error); diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 909daa070b..00549ff2fa 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -377,7 +377,8 @@ impl<'a> Reader<'a> { // TODO: Support page index until https://github.com/CeresDB/ceresdb/issues/1040 is fixed. - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + MetaData::try_new(&parquet_meta_data, ignore_sst_filter, self.store.clone()) + .await .box_err() .context(DecodeSstMeta) } diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 9c6055bd1f..e6c55a7c09 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -22,6 +22,7 @@ use arrow::{ util::bit_util, }; use async_trait::async_trait; +use bytes::Bytes; use bytes_ext::{BytesMut, SafeBufMut}; use ceresdbproto::sst as sst_pb; use common_types::{ @@ -36,7 +37,7 @@ use parquet::{ basic::Compression, file::{metadata::KeyValue, properties::WriterProperties}, }; -use prost::Message; +use prost::{bytes, Message}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; @@ -72,6 +73,18 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to decode sst meta data, bytes:{:?}, err:{}.\nBacktrace:\n{}", + bytes, + source, + backtrace, + ))] + DecodeFromBytes { + bytes: Vec, + source: prost::DecodeError, + backtrace: Backtrace, + }, + #[snafu(display( "Invalid meta key, expect:{}, given:{}.\nBacktrace:\n{}", expect, @@ -127,6 +140,16 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Invalid meta value header, bytes:{:?}.\nBacktrace:\n{}", + bytes, + backtrace + ))] + InvalidMetaBytesHeader { + bytes: Vec, + backtrace: Backtrace, + }, + #[snafu(display("Failed to convert sst meta data from protobuf, err:{}", source))] ConvertSstMetaData { source: crate::sst::parquet::meta_data::Error, @@ -174,19 +197,52 @@ pub enum Error { define_result!(Error); -pub const META_KEY: &str = "meta"; +// In v1 format, our customized meta is encoded in parquet itself, this may +// incur storage overhead since parquet KV only accept string, so we need to +// base64 our meta. +// In v2, we save meta in another independent file on object_store, its path is +// encoded in parquet KV, which is identified by `meta_path`. +pub const META_VERSION_V1: &str = "1"; +pub const META_VERSION_CURRENT: &str = "2"; +pub const META_KEY: &str = "meta"; // used in v1 +pub const META_PATH_KEY: &str = "meta_path"; // used in v2 +pub const META_VERSION_KEY: &str = "meta_version"; pub const META_VALUE_HEADER: u8 = 0; -/// Encode the sst meta data into binary key value pair. -pub fn encode_sst_meta_data(meta_data: ParquetMetaData) -> Result { +/// Encode the sst custom meta data into binary key value pair. +pub fn encode_sst_meta_data_v2(meta_data: ParquetMetaData) -> Result { let meta_data_pb = sst_pb::ParquetMetaData::from(meta_data); let mut buf = BytesMut::with_capacity(meta_data_pb.encoded_len() + 1); buf.try_put_u8(META_VALUE_HEADER) .expect("Should write header into the buffer successfully"); - // encode the sst meta data into protobuf binary + // encode the sst custom meta data into protobuf binary meta_data_pb.encode(&mut buf).context(EncodeIntoPb)?; + Ok(buf.into()) +} + +/// Decode the sst custom meta data from the binary key value pair. +pub fn decode_sst_meta_data_v2(bytes: &[u8]) -> Result { + ensure!( + bytes[0] == META_VALUE_HEADER, + InvalidMetaBytesHeader { + bytes: bytes.to_vec() + } + ); + let meta_data_pb: sst_pb::ParquetMetaData = + Message::decode(&bytes[1..]).context(DecodeFromBytes { + bytes: bytes.to_vec(), + })?; + + ParquetMetaData::try_from(meta_data_pb).context(ConvertSstMetaData) +} + +/// Encode the sst meta data into binary key value pair. +// TODO: remove this function when hybrid format is not supported. +pub fn encode_sst_meta_data_v1(meta_data: ParquetMetaData) -> Result { + let buf = encode_sst_meta_data_v2(meta_data)?; + Ok(KeyValue { key: META_KEY.to_string(), value: Some(base64::encode(buf.as_ref())), @@ -194,7 +250,7 @@ pub fn encode_sst_meta_data(meta_data: ParquetMetaData) -> Result { } /// Decode the sst meta data from the binary key value pair. -pub fn decode_sst_meta_data(kv: &KeyValue) -> Result { +pub fn decode_sst_meta_data_v1(kv: &KeyValue) -> Result { ensure!( kv.key == META_KEY, InvalidMetaKey { @@ -211,17 +267,7 @@ pub fn decode_sst_meta_data(kv: &KeyValue) -> Result { let raw_bytes = base64::decode(meta_value).context(DecodeBase64MetaValue { meta_value })?; - ensure!(!raw_bytes.is_empty(), InvalidMetaValueLen { meta_value }); - - ensure!( - raw_bytes[0] == META_VALUE_HEADER, - InvalidMetaValueHeader { meta_value } - ); - - let meta_data_pb: sst_pb::ParquetMetaData = - Message::decode(&raw_bytes[1..]).context(DecodeFromPb { meta_value })?; - - ParquetMetaData::try_from(meta_data_pb).context(ConvertSstMetaData) + decode_sst_meta_data_v2(&raw_bytes) } /// RecordEncoder is used for encoding ArrowBatch. @@ -234,6 +280,8 @@ trait RecordEncoder { fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()>; + fn set_meta_data_path(&mut self, metadata_path: Option) -> Result<()>; + /// Return encoded bytes /// Note: trait method cannot receive `self`, so take a &mut self here to /// indicate this encoder is already consumed @@ -297,12 +345,24 @@ impl RecordEncoder for ColumnarRecordEncoder { Ok(record_batch.num_rows()) } - fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()> { - let key_value = encode_sst_meta_data(meta_data)?; - self.arrow_writer - .as_mut() - .unwrap() - .append_key_value_metadata(key_value); + // TODO: this function is not need any more in meta v2 format, + // remove it in future. + fn set_meta_data(&mut self, _meta_data: ParquetMetaData) -> Result<()> { + Ok(()) + } + + fn set_meta_data_path(&mut self, metadata_path: Option) -> Result<()> { + let path_kv = KeyValue { + key: META_PATH_KEY.to_string(), + value: metadata_path, + }; + let version_kv = KeyValue { + key: META_VERSION_KEY.to_string(), + value: Some(META_VERSION_CURRENT.to_string()), + }; + let writer = self.arrow_writer.as_mut().unwrap(); + writer.append_key_value_metadata(path_kv); + writer.append_key_value_metadata(version_kv); Ok(()) } @@ -431,7 +491,7 @@ impl RecordEncoder for HybridRecordEncoder { fn set_meta_data(&mut self, mut meta_data: ParquetMetaData) -> Result<()> { meta_data.collapsible_cols_idx = mem::take(&mut self.collapsible_col_idx); - let key_value = encode_sst_meta_data(meta_data)?; + let key_value = encode_sst_meta_data_v1(meta_data)?; self.arrow_writer .as_mut() .unwrap() @@ -440,6 +500,10 @@ impl RecordEncoder for HybridRecordEncoder { Ok(()) } + fn set_meta_data_path(&mut self, _metadata_path: Option) -> Result<()> { + Ok(()) + } + async fn close(&mut self) -> Result<()> { assert!(self.arrow_writer.is_some()); @@ -505,6 +569,10 @@ impl ParquetEncoder { self.record_encoder.set_meta_data(meta_data) } + pub fn set_meta_data_path(&mut self, meta_data_path: Option) -> Result<()> { + self.record_encoder.set_meta_data_path(meta_data_path) + } + pub async fn close(mut self) -> Result<()> { self.record_encoder.close().await } diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index 85a58b6b3c..2396ae2639 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -379,6 +379,18 @@ impl From for MetaData { } } +impl From> for MetaData { + fn from(meta: Arc) -> Self { + Self { + min_key: meta.min_key.clone(), + max_key: meta.max_key.clone(), + time_range: meta.time_range, + max_sequence: meta.max_sequence, + schema: meta.schema.clone(), + } + } +} + impl fmt::Debug for ParquetMetaData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ParquetMetaData") diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index f5efc17fc7..1227fce2e6 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -21,8 +21,9 @@ use futures::StreamExt; use generic_error::BoxError; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; +use parquet::data_type::AsBytes; use snafu::ResultExt; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use super::meta_data::RowGroupFilter; use crate::{ @@ -30,14 +31,15 @@ use crate::{ factory::{ObjectStorePickerRef, SstWriteOptions}, file::Level, parquet::{ - encoding::ParquetEncoder, + encoding::{encode_sst_meta_data_v2, ParquetEncoder}, meta_data::{ParquetFilter, ParquetMetaData, RowGroupFilterBuilder}, }, writer::{ - self, BuildParquetFilter, EncodeRecordBatch, MetaData, PollRecordBatch, - RecordBatchStream, Result, SstInfo, SstWriter, Storage, + self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, Io, MetaData, + PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage, }, }, + table::sst_util, table_options::StorageFormat, }; @@ -180,7 +182,11 @@ impl RecordBatchGroupWriter { !self.hybrid_encoding && !self.level.is_min() } - async fn write_all(mut self, sink: W) -> Result { + async fn write_all( + mut self, + sink: W, + meta_path: &Path, + ) -> Result<(usize, ParquetMetaData)> { let mut prev_record_batch: Option = None; let mut arrow_row_group = Vec::new(); let mut total_num_rows = 0; @@ -232,8 +238,9 @@ impl RecordBatchGroupWriter { parquet_meta_data.parquet_filter = parquet_filter; parquet_meta_data }; + parquet_encoder - .set_meta_data(parquet_meta_data) + .set_meta_data_path(Some(meta_path.to_string())) .box_err() .context(EncodeRecordBatch)?; @@ -243,7 +250,7 @@ impl RecordBatchGroupWriter { .box_err() .context(EncodeRecordBatch)?; - Ok(total_num_rows) + Ok((total_num_rows, parquet_meta_data)) } } @@ -281,6 +288,37 @@ impl<'a> ObjectStoreMultiUploadAborter<'a> { } } +async fn write_metadata( + mut meta_sink: W, + parquet_metadata: ParquetMetaData, + meta_path: &object_store::Path, +) -> writer::Result<()> +where + W: AsyncWrite + Send + Unpin, +{ + let buf = encode_sst_meta_data_v2(parquet_metadata).context(EncodePbData)?; + meta_sink + .write_all(buf.as_bytes()) + .await + .with_context(|| Io { + file: meta_path.clone(), + })?; + + meta_sink.shutdown().await.with_context(|| Io { + file: meta_path.clone(), + })?; + + Ok(()) +} + +async fn multi_upload_abort(path: &Path, aborter: ObjectStoreMultiUploadAborter<'_>) { + // The uploading file will be leaked if failed to abort. A repair command will + // be provided to clean up the leaked files. + if let Err(e) = aborter.abort().await { + error!("Failed to abort multi-upload for sst:{}, err:{}", path, e); + } +} + #[async_trait] impl<'a> SstWriter for ParquetSstWriter<'a> { async fn write( @@ -308,20 +346,28 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { let (aborter, sink) = ObjectStoreMultiUploadAborter::initialize_upload(self.store, self.path).await?; - let total_num_rows = match group_writer.write_all(sink).await { + + let meta_path = Path::from(sst_util::new_metadata_path(self.path.as_ref())); + + let (total_num_rows, parquet_metadata) = + match group_writer.write_all(sink, &meta_path).await { + Ok(v) => v, + Err(e) => { + multi_upload_abort(self.path, aborter).await; + return Err(e); + } + }; + + let (meta_aborter, meta_sink) = + ObjectStoreMultiUploadAborter::initialize_upload(self.store, &meta_path).await?; + match write_metadata(meta_sink, parquet_metadata, &meta_path).await { Ok(v) => v, Err(e) => { - if let Err(e) = aborter.abort().await { - // The uploading file will be leaked if failed to abort. A repair command will - // be provided to clean up the leaked files. - error!( - "Failed to abort multi-upload for sst:{}, err:{}", - self.path, e - ); - } + multi_upload_abort(self.path, aborter).await; + multi_upload_abort(&meta_path, meta_aborter).await; return Err(e); } - }; + } let file_head = self.store.head(self.path).await.context(Storage)?; let storage_format = if self.hybrid_encoding { @@ -333,6 +379,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { file_size: file_head.size, row_num: total_num_rows, storage_format, + meta_path: meta_path.to_string(), }) } } diff --git a/analytic_engine/src/sst/writer.rs b/analytic_engine/src/sst/writer.rs index 0346c6d06d..24ec9a97ab 100644 --- a/analytic_engine/src/sst/writer.rs +++ b/analytic_engine/src/sst/writer.rs @@ -48,6 +48,18 @@ pub mod error { #[snafu(display("Failed to encode meta data, err:{}", source))] EncodeMetaData { source: GenericError }, + #[snafu(display("Failed to encode pb data, err:{}", source))] + EncodePbData { + source: crate::sst::parquet::encoding::Error, + }, + + #[snafu(display("IO failed, file:{file}, source:{source}.\nbacktrace:\n{backtrace}",))] + Io { + file: String, + source: std::io::Error, + backtrace: Backtrace, + }, + #[snafu(display( "Failed to encode record batch into sst, err:{}.\nBacktrace:\n{}", source, @@ -80,11 +92,12 @@ pub type RecordBatchStreamItem = std::result::Result + Send + Unpin>; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub struct SstInfo { pub file_size: usize, pub row_num: usize, pub storage_format: StorageFormat, + pub meta_path: String, } #[derive(Debug, Clone)] diff --git a/analytic_engine/src/table/sst_util.rs b/analytic_engine/src/table/sst_util.rs index 1c52882c40..0b8f497157 100644 --- a/analytic_engine/src/table/sst_util.rs +++ b/analytic_engine/src/table/sst_util.rs @@ -22,6 +22,7 @@ use table_engine::table::TableId; use crate::{space::SpaceId, sst::manager::FileId}; const SST_FILE_SUFFIX: &str = "sst"; +const SST_CUSTOM_METADATA_FILE_SUFFIX: &str = "metadata"; #[inline] /// Generate the sst file name. @@ -36,3 +37,8 @@ pub fn new_sst_file_path(space_id: SpaceId, table_id: TableId, file_id: FileId) sst_file_name(file_id), ]) } + +/// Convert sst_file_path into custom metadata path +pub fn new_metadata_path(sst_file_path: &str) -> String { + format!("{sst_file_path}.{SST_CUSTOM_METADATA_FILE_SUFFIX}") +} diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index 2fd49f73d6..41e5230831 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -74,6 +74,7 @@ impl From for manifest_pb::AddFileMeta { size: v.file.size, row_num: v.file.row_num, storage_format: manifest_pb::StorageFormat::from(v.file.storage_format) as i32, + associated_files: v.file.associated_files, } } } @@ -97,6 +98,7 @@ impl TryFrom for AddFile { time_range, max_seq: src.max_seq, storage_format: StorageFormat::from(storage_format), + associated_files: src.associated_files, }, }; @@ -191,6 +193,7 @@ pub mod tests { time_range: self.time_range, max_seq: self.max_seq, storage_format: StorageFormat::default(), + associated_files: Vec::new(), }, } } diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 1973e5af24..9bb382d190 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -110,7 +110,8 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) as _; let input_path = Path::from(config.input_file_name); - let sst_meta = util::meta_from_sst(&store, &input_path, &None).await; + let parquet_metadata = util::parquet_metadata(&store, &input_path).await; + let sst_meta = util::meta_from_sst(&parquet_metadata, &store, &None).await; let projected_schema = ProjectedSchema::no_projection(sst_meta.schema.clone()); let scan_options = ScanOptions { diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index b470b2a1c9..8ca0f96947 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -26,8 +26,7 @@ use analytic_engine::{ }, file::{FileHandle, FileMeta, FilePurgeQueue}, manager::FileId, - meta_data::cache::MetaCacheRef, - parquet::encoding, + meta_data::cache::{self, MetaCacheRef}, writer::MetaData, }, table::sst_util, @@ -66,18 +65,25 @@ pub fn new_runtime(thread_num: usize) -> Runtime { .unwrap() } -pub async fn meta_from_sst( +pub async fn parquet_metadata( store: &ObjectStoreRef, sst_path: &Path, - _meta_cache: &Option, -) -> MetaData { +) -> parquet_ext::ParquetMetaData { let get_result = store.get(sst_path).await.unwrap(); let chunk_reader = get_result.bytes().await.unwrap(); - let metadata = footer::parse_metadata(&chunk_reader).unwrap(); - let kv_metas = metadata.file_metadata().key_value_metadata().unwrap(); + footer::parse_metadata(&chunk_reader).unwrap() +} + +pub async fn meta_from_sst( + metadata: &parquet_ext::ParquetMetaData, + store: &ObjectStoreRef, + _meta_cache: &Option, +) -> MetaData { + let md = cache::MetaData::try_new(metadata, false, store.clone()) + .await + .unwrap(); - let parquet_meta_data = encoding::decode_sst_meta_data(&kv_metas[0]).unwrap(); - MetaData::from(parquet_meta_data) + MetaData::from(md.custom().clone()) } pub async fn schema_from_sst( @@ -85,7 +91,8 @@ pub async fn schema_from_sst( sst_path: &Path, meta_cache: &Option, ) -> Schema { - let sst_meta = meta_from_sst(store, sst_path, meta_cache).await; + let parquet_metadata = parquet_metadata(store, sst_path).await; + let sst_meta = meta_from_sst(&parquet_metadata, store, meta_cache).await; sst_meta.schema } @@ -170,8 +177,9 @@ pub async fn file_handles_from_ssts( for file_id in sst_file_ids.iter() { let path = sst_util::new_sst_file_path(space_id, table_id, *file_id); + let parquet_metadata = parquet_metadata(store, &path).await; + let sst_meta = meta_from_sst(&parquet_metadata, store, meta_cache).await; - let sst_meta = meta_from_sst(store, &path, meta_cache).await; let file_meta = FileMeta { id: *file_id, size: 0, @@ -179,6 +187,7 @@ pub async fn file_handles_from_ssts( time_range: sst_meta.time_range, max_seq: sst_meta.max_sequence, storage_format: StorageFormat::Columnar, + associated_files: Vec::new(), }; let handle = FileHandle::new(file_meta, purge_queue.clone()); diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 176d8a19f3..06dfb90e7f 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -286,12 +286,12 @@ async fn parse_metadata( let md = if page_indexes { let object_store_reader = - ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); + ObjectStoreReader::new(storage.clone(), path.clone(), Arc::new(parquet_metadata)); let parquet_metadata = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; - MetaData::try_new(&parquet_metadata, false)? + MetaData::try_new(&parquet_metadata, false, storage).await? } else { - MetaData::try_new(&parquet_metadata, false)? + MetaData::try_new(&parquet_metadata, false, storage).await? }; Ok((md, metadata_size, kv_size)) diff --git a/tools/src/sst_util.rs b/tools/src/sst_util.rs index f4c76598c9..9d8fceda13 100644 --- a/tools/src/sst_util.rs +++ b/tools/src/sst_util.rs @@ -29,6 +29,6 @@ pub async fn meta_from_sst(store: &ObjectStoreRef, sst_path: &Path) -> MetaData .find(|kv| kv.key == encoding::META_KEY) .unwrap(); - let parquet_meta_data = encoding::decode_sst_meta_data(kv_meta).unwrap(); + let parquet_meta_data = encoding::decode_sst_meta_data_v1(kv_meta).unwrap(); MetaData::from(parquet_meta_data) }