Skip to content

Commit

Permalink
Merge branch 'main' into enhance-all-tables-getting-of-influxql
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint authored Apr 23, 2023
2 parents 8775bbd + 0a28aaf commit 164d9c5
Show file tree
Hide file tree
Showing 33 changed files with 1,077 additions and 238 deletions.
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.2"
ceresdbproto = "1.0.3"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand All @@ -78,6 +78,7 @@ common_util = { path = "common_util" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b87871fdd1f4ce64201eb1f7c79a0547627f37e9" }
df_operator = { path = "df_operator" }
etcd-client = "0.10.3"
env_logger = "0.6"
futures = "0.3"
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", features = [
Expand Down Expand Up @@ -137,6 +138,7 @@ clap = { workspace = true }
cluster = { workspace = true }
common_util = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
log = { workspace = true }
logger = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ test:
cd $(DIR); cargo test --workspace -- --test-threads=4

integration-test:
# TODO: restore it as `make run` after we fix the clustering integration test.
cd $(DIR)/integration_tests; make run-local
cd $(DIR)/integration_tests; make run

# grcov needs build first, then run test
build-ut:
Expand Down
10 changes: 10 additions & 0 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,16 @@ pub struct TableCompactionRequest {
}

impl TableCompactionRequest {
pub fn new(table_data: TableDataRef) -> (Self, oneshot::Receiver<WaitResult<()>>) {
let (tx, rx) = oneshot::channel::<WaitResult<()>>();
let req = Self {
table_data,
waiter: Some(tx),
};

(req, rx)
}

pub fn no_waiter(table_data: TableDataRef) -> Self {
TableCompactionRequest {
table_data,
Expand Down
7 changes: 2 additions & 5 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ impl CommonCompactionPicker {
levels_controller: &LevelsController,
expire_time: Option<Timestamp>,
) -> Option<CompactionInputFiles> {
let num_levels = levels_controller.num_levels();
//TODO(boyan) level compaction strategy
for level in 0..num_levels {
for level in levels_controller.levels() {
if let Some(files) = self.level_picker.pick_candidates_at_level(
ctx,
levels_controller,
Expand All @@ -112,8 +110,7 @@ impl CommonCompactionPicker {
return Some(CompactionInputFiles {
level,
files,
// Now, we always output to the same level.
output_level: level,
output_level: level.next(),
});
}
}
Expand Down
24 changes: 19 additions & 5 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
},
sst::{
factory::{self, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions},
file::FileMeta,
file::{FileMeta, Level},
meta_data::SstMetaReader,
writer::{MetaData, RecordBatchStream},
},
Expand Down Expand Up @@ -387,7 +387,10 @@ impl FlushTask {
let file = self.dump_normal_memtable(request_id, mem).await?;
if let Some(file) = file {
let sst_size = file.size;
files_to_level0.push(AddFile { level: 0, file });
files_to_level0.push(AddFile {
level: Level::MIN,
file,
});

// Set flushed sequence to max of the last_sequence of memtables.
flushed_sequence = cmp::max(flushed_sequence, mem.last_sequence());
Expand Down Expand Up @@ -516,7 +519,12 @@ impl FlushTask {
let handler = self.runtime.spawn(async move {
let mut writer = store
.sst_factory
.create_writer(&sst_write_options, &sst_file_path, store.store_picker())
.create_writer(
&sst_write_options,
&sst_file_path,
store.store_picker(),
Level::MIN,
)
.await
.context(CreateSstWriter {
storage_format_hint,
Expand Down Expand Up @@ -572,7 +580,7 @@ impl FlushTask {
for (idx, info_and_meta) in info_and_metas.into_iter().enumerate() {
let (sst_info, sst_meta) = info_and_meta?;
files_to_level0.push(AddFile {
level: 0,
level: Level::MIN,
file: FileMeta {
id: file_ids[idx],
size: sst_info.file_size as u64,
Expand Down Expand Up @@ -628,6 +636,7 @@ impl FlushTask {
&sst_write_options,
&sst_file_path,
self.space_store.store_picker(),
Level::MIN,
)
.await
.context(CreateSstWriter {
Expand Down Expand Up @@ -853,7 +862,12 @@ impl SpaceStore {

let mut sst_writer = self
.sst_factory
.create_writer(sst_write_options, &sst_file_path, self.store_picker())
.create_writer(
sst_write_options,
&sst_file_path,
self.store_picker(),
input.output_level,
)
.await
.context(CreateSstWriter {
storage_format_hint: sst_write_options.storage_format_hint,
Expand Down
39 changes: 30 additions & 9 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ pub enum Error {
source: crate::compaction::scheduler::Error,
},

#[snafu(display("Failed to flush table manually, table:{}, err:{}", table, source))]
ManualFlush { table: String, source: GenericError },
#[snafu(display("Failed to {} table manually, table:{}, err:{}", op, table, source))]
ManualOp {
op: String,
table: String,
source: GenericError,
},

#[snafu(display("Failed to receive flush result, table:{}, err:{}", table, source))]
RecvFlushResult { table: String, source: RecvError },
#[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))]
RecvManualOpResult {
op: String,
table: String,
source: RecvError,
},
}

define_result!(Error);
Expand Down Expand Up @@ -194,25 +202,29 @@ impl Instance {
.schedule_flush(flush_scheduler, table_data, flush_opts)
.await
.box_err()
.context(ManualFlush {
.context(ManualOp {
op: "flush",
table: &table_data.name,
})?;

if let Some(rx) = rx_opt {
rx.await
.context(RecvFlushResult {
.context(RecvManualOpResult {
op: "flush",
table: &table_data.name,
})?
.box_err()
.context(ManualFlush {
.context(ManualOp {
op: "flush",
table: &table_data.name,
})?;
}
Ok(())
}

// This method will wait until compaction finished.
pub async fn manual_compact_table(&self, table_data: &TableDataRef) -> Result<()> {
let request = TableCompactionRequest::no_waiter(table_data.clone());
let (request, rx) = TableCompactionRequest::new(table_data.clone());
let succeed = self
.compaction_scheduler
.schedule_table_compaction(request)
Expand All @@ -221,7 +233,16 @@ impl Instance {
error!("Failed to schedule compaction, table:{}", table_data.name);
}

Ok(())
rx.await
.context(RecvManualOpResult {
op: "compact",
table: &table_data.name,
})?
.box_err()
.context(ManualOp {
op: "compact",
table: &table_data.name,
})
}
}

Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ use crate::{
space::SpaceId,
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReadOptions},
file::FileHandle,
manager::MAX_LEVEL,
file::{FileHandle, Level, SST_LEVEL_NUM},
},
table::version::{MemTableVec, SamplingMemTable},
};
Expand Down Expand Up @@ -127,7 +126,7 @@ impl<'a> MergeBuilder<'a> {
config,
sampling_mem: None,
memtables: Vec::new(),
ssts: vec![Vec::new(); MAX_LEVEL],
ssts: vec![Vec::new(); SST_LEVEL_NUM],
}
}

Expand All @@ -151,8 +150,8 @@ impl<'a> MergeBuilder<'a> {
}

/// Returns file handles in `level`, panic if level >= MAX_LEVEL
pub fn mut_ssts_of_level(&mut self, level: u16) -> &mut Vec<FileHandle> {
&mut self.ssts[usize::from(level)]
pub fn mut_ssts_of_level(&mut self, level: Level) -> &mut Vec<FileHandle> {
&mut self.ssts[level.as_usize()]
}

pub async fn build(self) -> Result<MergeIterator> {
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use trace_metric::MetricsCollector;

use crate::{
sst::{
file::Level,
header,
header::HeaderParser,
meta_data::cache::MetaCacheRef,
Expand Down Expand Up @@ -74,6 +75,7 @@ pub trait Factory: Send + Sync + Debug {
options: &SstWriteOptions,
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
level: Level,
) -> Result<Box<dyn SstWriter + Send + 'a>>;
}

Expand Down Expand Up @@ -178,6 +180,7 @@ impl Factory for FactoryImpl {
options: &SstWriteOptions,
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
level: Level,
) -> Result<Box<dyn SstWriter + Send + 'a>> {
let hybrid_encoding = match options.storage_format_hint {
StorageFormatHint::Specific(format) => matches!(format, StorageFormat::Hybrid),
Expand All @@ -187,6 +190,7 @@ impl Factory for FactoryImpl {

Ok(Box::new(ParquetSstWriter::new(
path,
level,
hybrid_encoding,
store_picker,
options,
Expand Down
Loading

0 comments on commit 164d9c5

Please sign in to comment.