diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 0f69f5ffb78..4b8661c7c90 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -445,6 +445,7 @@ extern const int S3_ERROR = 11000; extern const int CANNOT_SCHEDULE_TASK = 11001; extern const int S3_LOCK_CONFLICT = 11002; extern const int DT_DELTA_INDEX_ERROR = 11003; +extern const int FETCH_PAGES_ERROR = 11004; } // namespace ErrorCodes } // namespace DB diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index e7ff836d7dd..4f5845d0a05 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -365,7 +365,8 @@ namespace DB F(type_worker_fetch_page, {{"type", "worker_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \ F(type_worker_prepare_stream, {{"type", "worker_prepare_stream"}}, ExpBuckets{0.01, 2, 20}), \ F(type_stream_wait_next_task, {{"type", "stream_wait_next_task"}}, ExpBuckets{0.01, 2, 20}), \ - F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20})) \ + F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_deserialize_page, {{"type", "deserialize_page"}}, ExpBuckets{0.01, 2, 20})) \ M(tiflash_disaggregated_details, \ "", \ Counter, \ diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 798a9dceb7e..8d46e9a2982 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -105,6 +105,8 @@ class ColumnFilePersistedSet /// Thread safe part end String detailInfo() const { return columnFilesToString(persisted_files); } + const ColumnFilePersisteds & getFiles() const { return persisted_files; } + void saveMeta(WriteBatches & wbs) const; void recordRemoveColumnFilesPages(WriteBatches & wbs) const; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 1d229f975d1..70c720ba2cf 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -35,10 +35,13 @@ #include -namespace DB +namespace DB::DM { -namespace DM +namespace tests { +class SegmentReadTaskTest; +} + using GenPageId = std::function; class DeltaValueSpace; class DeltaValueSnapshot; @@ -136,9 +139,10 @@ class DeltaValueSpace persisted_file_set->resetLogger(segment_log); } - /// The following two methods are just for test purposes + /// The following 3 methods are just for test purposes MemTableSetPtr getMemTableSet() const { return mem_table_set; } ColumnFilePersistedSetPtr getPersistedFileSet() const { return persisted_file_set; } + UInt64 getDeltaIndexEpoch() const { return delta_index_epoch; } String simpleInfo() const { return "getId()) + ">"; } String info() const { return fmt::format("{}. {}", mem_table_set->info(), persisted_file_set->info()); } @@ -395,7 +399,7 @@ class DeltaValueSnapshot RowKeyRange getSquashDeleteRange() const; - const auto & getSharedDeltaIndex() { return shared_delta_index; } + const auto & getSharedDeltaIndex() const { return shared_delta_index; } size_t getDeltaIndexEpoch() const { return delta_index_epoch; } bool isForUpdate() const { return is_update; } @@ -548,5 +552,4 @@ class DeltaValueInputStream : public SkippableBlockInputStream } }; -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index bd8cf9f3ced..2b91f5855c9 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -34,7 +34,11 @@ class MemTableSet : public std::enable_shared_from_this , private boost::noncopyable { +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif // Note that we must update `column_files_count` for outer thread-safe after `column_files` changed ColumnFiles column_files; // TODO: check the proper memory_order when use this atomic variable diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index b8c35479632..a4adb835f6f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -46,13 +46,13 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) auto block_slots = pool->getFreeBlockSlots(); LOG_DEBUG( log, - "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", // + "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", // pool->pool_id, block_slots, tasks.size(), read_pools.size(), - sw_add.elapsed(), - sw_do_add.elapsed()); + sw_add.elapsed() / 1000.0, + sw_do_add.elapsed() / 1000.0); } std::pair SegmentReadTaskScheduler::scheduleMergedTask() diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp new file mode 100644 index 00000000000..76f4880fe21 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp @@ -0,0 +1,48 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::DM::Remote +{ + +IPreparedDMFileTokenPtr DataStoreMock::prepareDMFileByKey(const String & remote_key) +{ + return std::make_shared(file_provider, remote_key); +} + +static std::tuple parseDMFilePath(const String & path) +{ + // Path likes /disk1/data/t_100/stable/dmf_2. + auto pos = path.find_last_of('_'); + RUNTIME_CHECK(pos != std::string::npos, path); + auto file_id = stoul(path.substr(pos + 1)); + + pos = path.rfind("/dmf_"); + RUNTIME_CHECK(pos != std::string::npos, path); + auto parent_path = path.substr(0, pos); + return std::tuple{parent_path, file_id}; +} + +DMFilePtr MockPreparedDMFileToken::restore(DMFile::ReadMetaMode read_mode) +{ + auto [parent_path, file_id] = parseDMFilePath(path); + return DMFile::restore( + file_provider, + file_id, + /*page_id*/ 0, + parent_path, + read_mode); +} +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h new file mode 100644 index 00000000000..38f609102f2 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h @@ -0,0 +1,78 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::DM::Remote +{ +class DataStoreMock final : public IDataStore +{ +public: + explicit DataStoreMock(FileProviderPtr file_provider_) + : file_provider(file_provider_) + {} + + ~DataStoreMock() override = default; + + void putDMFile(DMFilePtr, const S3::DMFileOID &, bool) override + { + throw Exception("DataStoreMock::putDMFile unsupported"); + } + + IPreparedDMFileTokenPtr prepareDMFile(const S3::DMFileOID &, UInt64) override + { + throw Exception("DataStoreMock::prepareDMFile unsupported"); + } + + IPreparedDMFileTokenPtr prepareDMFileByKey(const String & remote_key) override; + + bool putCheckpointFiles(const PS::V3::LocalCheckpointFiles &, StoreID, UInt64) override + { + throw Exception("DataStoreMock::putCheckpointFiles unsupported"); + } + + std::unordered_map getDataFilesInfo(const std::unordered_set &) override + { + throw Exception("DataStoreMock::getDataFilesInfo unsupported"); + } + + void setTaggingsForKeys(const std::vector &, std::string_view) override + { + throw Exception("DataStoreMock::setTaggingsForKeys unsupported"); + } + +private: + FileProviderPtr file_provider; +}; + +class MockPreparedDMFileToken : public IPreparedDMFileToken +{ +public: + MockPreparedDMFileToken(const FileProviderPtr & file_provider_, const String & path_) + : IPreparedDMFileToken::IPreparedDMFileToken(file_provider_, {}, 0) + , path(path_) + {} + + ~MockPreparedDMFileToken() override = default; + + DMFilePtr restore(DMFile::ReadMetaMode read_mode) override; + +private: + String path; +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp deleted file mode 100644 index e2bdffd9e64..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ /dev/null @@ -1,347 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -using namespace std::chrono_literals; - -namespace DB::DM::Remote -{ - -RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask(const SegmentReadTaskPtr & seg_task) -{ - const auto & extra_info = *(seg_task->extra_remote_info); - std::vector cf_tiny_oids; - { - cf_tiny_oids.reserve(extra_info.remote_page_ids.size()); - for (const auto & page_id : extra_info.remote_page_ids) - { - auto page_oid = PageOID{ - .store_id = seg_task->store_id, - .ks_table_id = {seg_task->dm_context->keyspace_id, seg_task->dm_context->physical_table_id}, - .page_id = page_id, - }; - cf_tiny_oids.emplace_back(page_oid); - } - } - - // Note: We must occupySpace segment by segment, because we need to read - // at least the complete data of one segment in order to drive everything forward. - // Currently we call occupySpace for each FetchPagesRequest, which is fine, - // because we send one request each seg_task. If we want to split - // FetchPagesRequest into multiples in future, then we need to change - // the moment of calling `occupySpace`. - const auto & dm_context = seg_task->dm_context; - auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; - auto scan_context = dm_context->scan_context; - - Stopwatch w_occupy; - auto occupy_result = page_cache->occupySpace(cf_tiny_oids, extra_info.remote_page_sizes, scan_context); - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds()); - - return occupy_result; -} - -disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( - const SegmentReadTaskPtr & seg_task, - const std::vector & pages_not_in_cache) -{ - disaggregated::FetchDisaggPagesRequest req; - auto meta = seg_task->extra_remote_info->snapshot_id.toMeta(); - // The keyspace_id here is not vital, as we locate the table and segment by given - // snapshot_id. But it could be helpful for debugging. - auto keyspace_id = seg_task->dm_context->keyspace_id; - meta.set_keyspace_id(keyspace_id); - meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); - *req.mutable_snapshot_id() = meta; - req.set_table_id(seg_task->dm_context->physical_table_id); - req.set_segment_id(seg_task->segment->segmentId()); - - for (auto page_id : pages_not_in_cache) - req.add_page_ids(page_id.page_id); - - return req; -} - -SegmentReadTaskPtr RNWorkerFetchPages::doWork(const SegmentReadTaskPtr & seg_task) -{ - MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); - Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; - SCOPE_EXIT({ - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page) - .Observe(watch_work.elapsedSeconds()); - }); - - auto occupy_result = blockingOccupySpaceForTask(seg_task); - auto req = buildFetchPagesRequest(seg_task, occupy_result.pages_not_in_cache); - { - auto cftiny_total = seg_task->extra_remote_info->remote_page_ids.size(); - auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); - LOG_DEBUG( - log, - "Ready to fetch pages, seg_task={} page_hit_rate={} pages_not_in_cache={}", - seg_task, - cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), - occupy_result.pages_not_in_cache); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); - } - - const size_t max_retry_times = 3; - std::exception_ptr last_exception; - - // TODO: Maybe don't need to re-fetch all pages when retry. - for (size_t i = 0; i < max_retry_times; ++i) - { - try - { - doFetchPages(seg_task, req); - seg_task->initColumnFileDataProvider(occupy_result.pages_guard); - - // We finished fetch all pages for this seg task, just return it for downstream - // workers. If we have met any errors, page guard will not be persisted. - return seg_task; - } - catch (const pingcap::Exception & e) - { - last_exception = std::current_exception(); - LOG_WARNING( - log, - "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", - e.displayText(), - seg_task); - std::this_thread::sleep_for(1s); - } - catch (...) - { - LOG_ERROR(log, "{}: {}", seg_task, getCurrentExceptionMessage(true)); - throw; - } - } - - // Still failed after retry... - RUNTIME_CHECK(last_exception); - std::rethrow_exception(last_exception); -} - -// In order to make network and disk run parallelly, -// `doFetchPages` will receive data pages from WN, -// package these data pages into several `WritePageTask` objects -// and send them to `RNWritePageCachePool` to write into local page cache. -struct WritePageTask -{ - explicit WritePageTask(RNLocalPageCache * page_cache_) - : page_cache(page_cache_) - {} - RNLocalPageCache * page_cache; - UniversalWriteBatch wb; - std::list remote_pages; // Hold the data of wb. - std::list remote_page_mem_tracker_wrappers; // Hold the memory stat of remote_pages. -}; -using WritePageTaskPtr = std::unique_ptr; - - -void RNWorkerFetchPages::doFetchPages( - const SegmentReadTaskPtr & seg_task, - const disaggregated::FetchDisaggPagesRequest & request) -{ - // No page need to be fetched. - if (request.page_ids_size() == 0) - return; - - Stopwatch sw_total; - Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; - bool rpc_is_observed = false; - double total_write_page_cache_sec = 0.0; - - pingcap::kv::RpcCall rpc( - seg_task->dm_context->db_context.getTMTContext().getKVCluster()->rpc_client, - seg_task->extra_remote_info->store_address); - - grpc::ClientContext client_context; - auto stream_resp = rpc.call(&client_context, request); - - SCOPE_EXIT({ - // TODO: Not sure whether we really need this. Maybe RAII is already there? - stream_resp->Finish(); - }); - - // Used to verify all pages are fetched. - std::set remaining_pages_to_fetch; - for (auto p : request.page_ids()) - remaining_pages_to_fetch.insert(p); - - UInt64 read_stream_ns = 0; - UInt64 deserialize_page_ns = 0; - UInt64 schedule_write_page_ns = 0; - UInt64 packet_count = 0; - UInt64 task_count = 0; - UInt64 page_count = request.page_ids_size(); - - auto schedule_task = [&task_count, &schedule_write_page_ns](WritePageTaskPtr && write_page_task) { - task_count += 1; - auto task = std::make_shared>([write_page_task = std::move(write_page_task)]() { - write_page_task->page_cache->write(std::move(write_page_task->wb)); - }); - Stopwatch sw; - RNWritePageCachePool::get().scheduleOrThrowOnError([task]() { (*task)(); }); - schedule_write_page_ns += sw.elapsed(); - return task->get_future(); - }; - - WritePageTaskPtr write_page_task; - std::vector> write_page_results; - - // Keep reading packets. - while (true) - { - Stopwatch sw_packet; - auto packet = std::make_shared(); - if (bool more = stream_resp->Read(packet.get()); !more) - break; - - MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get()); - - read_stream_ns += sw_packet.elapsedFromLastTime(); - packet_count += 1; - if (!rpc_is_observed) - { - // Count RPC time as sending request + receive first response packet. - rpc_is_observed = true; - // This metric is per-segment, because we only count once for each task. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page) - .Observe(watch_rpc.elapsedSeconds()); - } - - if (packet->has_error()) - { - throw Exception(fmt::format("{} (from {})", packet->error().msg(), seg_task)); - } - - Stopwatch watch_write_page_cache{CLOCK_MONOTONIC_COARSE}; - SCOPE_EXIT({ total_write_page_cache_sec += watch_write_page_cache.elapsedSeconds(); }); - - std::vector received_page_ids; - for (const String & page : packet->pages()) - { - if (write_page_task == nullptr) - { - write_page_task = std::make_unique( - seg_task->dm_context->db_context.getSharedContextDisagg()->rn_page_cache.get()); - } - auto & remote_page = write_page_task->remote_pages.emplace_back(); // NOLINT(bugprone-use-after-move) - bool parsed = remote_page.ParseFromString(page); - RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task); - write_page_task->remote_page_mem_tracker_wrappers.emplace_back( - remote_page.SpaceUsedLong(), - fetch_pages_mem_tracker.get()); - - RUNTIME_CHECK( - remaining_pages_to_fetch.contains(remote_page.page_id()), - remaining_pages_to_fetch, - remote_page.page_id()); - - received_page_ids.emplace_back(remote_page.page_id()); - remaining_pages_to_fetch.erase(remote_page.page_id()); - - // Write page into LocalPageCache. Note that the page must be occupied. - auto oid = Remote::PageOID{ - .store_id = seg_task->store_id, - .ks_table_id = {seg_task->dm_context->keyspace_id, seg_task->dm_context->physical_table_id}, - .page_id = remote_page.page_id(), - }; - auto read_buffer - = std::make_shared(remote_page.data().data(), remote_page.data().size()); - PageFieldSizes field_sizes; - field_sizes.reserve(remote_page.field_sizes_size()); - for (const auto & field_sz : remote_page.field_sizes()) - { - field_sizes.emplace_back(field_sz); - } - deserialize_page_ns += sw_packet.elapsedFromLastTime(); - - auto page_id = RNLocalPageCache::buildCacheId(oid); - write_page_task->wb - .putPage(page_id, 0, std::move(read_buffer), remote_page.data().size(), std::move(field_sizes)); - auto write_batch_limit_size - = seg_task->dm_context->db_context.getSettingsRef().dt_write_page_cache_limit_size; - if (write_page_task->wb.getTotalDataSize() >= write_batch_limit_size) - { - write_page_results.push_back( - schedule_task(std::move(write_page_task))); // write_page_task is moved and reset. - } - } - } - - if (write_page_task != nullptr && write_page_task->wb.getTotalDataSize() > 0) - { - write_page_results.push_back(schedule_task(std::move(write_page_task))); - } - - Stopwatch sw_wait_write_page_finished; - for (auto & f : write_page_results) - { - f.get(); - } - auto wait_write_page_finished_ns = sw_wait_write_page_finished.elapsed(); - - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_write_page_cache) - .Observe(total_write_page_cache_sec); - - // Verify all pending pages are now received. - RUNTIME_CHECK_MSG( - remaining_pages_to_fetch.empty(), - "Failed to fetch all pages (from {}), remaining_pages_to_fetch={}", - seg_task, - remaining_pages_to_fetch); - - LOG_DEBUG( - log, - "Finished fetch pages, seg_task={}, page_count={}, packet_count={}, task_count={}, " - "total_ms={}, read_stream_ms={}, deserialize_page_ms={}, schedule_write_page_ms={}, " - "wait_write_page_finished_ms={}", - seg_task, - page_count, - packet_count, - task_count, - sw_total.elapsed() / 1000000, - read_stream_ns / 1000000, - deserialize_page_ns / 1000000, - schedule_write_page_ns / 1000000, - wait_write_page_finished_ns / 1000000); -} - -} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h index 3ec3f402b1d..6e7f7f77d05 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h @@ -31,13 +31,14 @@ class RNWorkerFetchPages , public ThreadedWorker { protected: - SegmentReadTaskPtr doWork(const SegmentReadTaskPtr & task) override; + SegmentReadTaskPtr doWork(const SegmentReadTaskPtr & task) override + { + task->fetchPages(); + return task; + } String getName() const noexcept override { return "FetchPages"; } -private: - void doFetchPages(const SegmentReadTaskPtr & seg_task, const disaggregated::FetchDisaggPagesRequest & request); - public: struct Options { diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp deleted file mode 100644 index 7eb50c18787..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include - -namespace DB::ErrorCodes -{ -extern const int DT_DELTA_INDEX_ERROR; -} - -namespace DB::DM::Remote -{ - -bool RNWorkerPrepareStreams::initInputStream(const SegmentReadTaskPtr & task, bool enable_delta_index_error_fallback) -{ - try - { - task->initInputStream(*columns_to_read, read_tso, push_down_filter, read_mode); - return true; - } - catch (const Exception & e) - { - if (enable_delta_index_error_fallback && e.code() == ErrorCodes::DT_DELTA_INDEX_ERROR) - { - LOG_ERROR(task->read_snapshot->log, "{}", e.message()); - return false; - } - else - { - throw; - } - } -} - -SegmentReadTaskPtr RNWorkerPrepareStreams::doWork(const SegmentReadTaskPtr & task) -{ - Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; - SCOPE_EXIT({ - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_prepare_stream) - .Observe(watch_work.elapsedSeconds()); - }); - - if (likely( - initInputStream(task, task->dm_context->db_context.getSettingsRef().dt_enable_delta_index_error_fallback))) - { - return task; - } - - // Exception DT_DELTA_INDEX_ERROR raised. Reset delta index and try again. - DeltaIndex empty_delta_index; - task->read_snapshot->delta->getSharedDeltaIndex()->swap(empty_delta_index); - if (auto cache = task->dm_context->db_context.getSharedContextDisagg()->rn_delta_index_cache; cache) - { - cache->setDeltaIndex(task->read_snapshot->delta->getSharedDeltaIndex()); - } - initInputStream(task, false); - return task; -} - -} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h index 961b24977cf..28a58b27301 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h @@ -15,9 +15,9 @@ #pragma once #include +#include #include -#include -#include +#include #include @@ -35,7 +35,18 @@ class RNWorkerPrepareStreams , public ThreadedWorker { protected: - SegmentReadTaskPtr doWork(const SegmentReadTaskPtr & task) override; + SegmentReadTaskPtr doWork(const SegmentReadTaskPtr & task) override + { + const auto & settings = task->dm_context->db_context.getSettingsRef(); + task->initInputStream( + *columns_to_read, + read_tso, + push_down_filter, + read_mode, + settings.max_block_size, + settings.dt_enable_delta_index_error_fallback); + return task; + } String getName() const noexcept override { return "PrepareStreams"; } @@ -76,11 +87,6 @@ class RNWorkerPrepareStreams {} ~RNWorkerPrepareStreams() override { wait(); } - - bool initInputStream(const SegmentReadTaskPtr & task, bool enable_delta_index_error_fallback); - - // Only use in unit-test. - SegmentReadTaskPtr testDoWork(const SegmentReadTaskPtr & task) { return doWork(task); } }; } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 5a391078d2d..73143aac892 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -93,7 +93,9 @@ RemotePb::RemoteSegment Serializer::serializeTo( auto * remote_file = remote.add_stable_pages(); remote_file->set_page_id(dt_file->pageId()); auto * checkpoint_info = remote_file->mutable_checkpoint_info(); +#ifndef DBMS_PUBLIC_GTEST // Don't not check path in unittests. RUNTIME_CHECK(startsWith(dt_file->path(), "s3://"), dt_file->path()); +#endif checkpoint_info->set_data_file_id(dt_file->path()); // It should be a key to remote path } remote.mutable_column_files_memtable()->CopyFrom( @@ -163,6 +165,7 @@ SegmentSnapshotPtr Serializer::deserializeSegmentSnapshotFrom( auto remote_key = stable_file.checkpoint_info().data_file_id(); auto prepared = data_store->prepareDMFileByKey(remote_key); auto dmfile = prepared->restore(DMFile::ReadMetaMode::all()); + RUNTIME_CHECK(dmfile != nullptr, remote_key); dmfiles.emplace_back(std::move(dmfile)); } new_stable->setFiles(dmfiles, segment_range, &dm_context); @@ -389,7 +392,6 @@ ColumnFileBigPtr Serializer::deserializeCFBig( { RUNTIME_CHECK(proto.has_checkpoint_info()); LOG_DEBUG(Logger::get(), "Rebuild local ColumnFileBig from remote, key={}", proto.checkpoint_info().data_file_id()); - auto prepared = data_store->prepareDMFileByKey(proto.checkpoint_info().data_file_id()); auto dmfile = prepared->restore(DMFile::ReadMetaMode::all()); auto * cf_big = new ColumnFileBig(dmfile, proto.valid_rows(), proto.valid_bytes(), segment_range); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 94c1051fa8a..63b96ba3610 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -13,23 +13,32 @@ // limitations under the License. #include +#include #include #include #include #include #include -#include #include #include #include #include #include +#include + +using namespace std::chrono_literals; namespace CurrentMetrics { extern const Metric DT_SegmentReadTasks; } +namespace DB::ErrorCodes +{ +extern const int DT_DELTA_INDEX_ERROR; +extern const int FETCH_PAGES_ERROR; +} // namespace DB::ErrorCodes + namespace DB::DM { SegmentReadTask::SegmentReadTask( @@ -214,13 +223,77 @@ void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheG KeyspaceTableID{dm_context->keyspace_id, dm_context->physical_table_id}); } + void SegmentReadTask::initInputStream( const ColumnDefines & columns_to_read, UInt64 read_tso, const PushDownFilterPtr & push_down_filter, - ReadMode read_mode) + ReadMode read_mode, + size_t expected_block_size, + bool enable_delta_index_error_fallback) +{ + if (likely(doInitInputStreamWithErrorFallback( + columns_to_read, + read_tso, + push_down_filter, + read_mode, + expected_block_size, + enable_delta_index_error_fallback))) + { + return; + } + + // Exception DT_DELTA_INDEX_ERROR raised. Reset delta index and try again. + DeltaIndex empty_delta_index; + read_snapshot->delta->getSharedDeltaIndex()->swap(empty_delta_index); + if (auto cache = dm_context->db_context.getSharedContextDisagg()->rn_delta_index_cache; cache) + { + cache->setDeltaIndex(read_snapshot->delta->getSharedDeltaIndex()); + } + doInitInputStream(columns_to_read, read_tso, push_down_filter, read_mode, expected_block_size); +} + +bool SegmentReadTask::doInitInputStreamWithErrorFallback( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode, + size_t expected_block_size, + bool enable_delta_index_error_fallback) +{ + try + { + doInitInputStream(columns_to_read, read_tso, push_down_filter, read_mode, expected_block_size); + return true; + } + catch (const Exception & e) + { + if (enable_delta_index_error_fallback && e.code() == ErrorCodes::DT_DELTA_INDEX_ERROR) + { + LOG_ERROR(read_snapshot->log, "{}", e.message()); + return false; + } + else + { + throw; + } + } +} + +void SegmentReadTask::doInitInputStream( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode, + size_t expected_block_size) { RUNTIME_CHECK(input_stream == nullptr); + Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_prepare_stream) + .Observe(watch_work.elapsedSeconds()); + }); + input_stream = segment->getInputStream( read_mode, *dm_context, @@ -229,7 +302,298 @@ void SegmentReadTask::initInputStream( ranges, push_down_filter, read_tso, - DEFAULT_BLOCK_SIZE); + expected_block_size); +} + + +void SegmentReadTask::fetchPages() +{ + if (!extra_remote_info.has_value() || extra_remote_info->remote_page_ids.empty()) + { + return; + } + + MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); + Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page) + .Observe(watch_work.elapsedSeconds()); + }); + + auto occupy_result = blockingOccupySpaceForTask(); + auto req = buildFetchPagesRequest(occupy_result.pages_not_in_cache); + { + auto cftiny_total = extra_remote_info->remote_page_ids.size(); + auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); + LOG_DEBUG( + read_snapshot->log, + "Ready to fetch pages, seg_task={} page_hit_rate={} pages_not_in_cache={}", + *this, + cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), + occupy_result.pages_not_in_cache); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); + } + + const size_t max_retry_times = 3; + std::exception_ptr last_exception; + + // TODO: Maybe don't need to re-fetch all pages when retry. + for (size_t i = 0; i < max_retry_times; ++i) + { + try + { + doFetchPages(req); + initColumnFileDataProvider(occupy_result.pages_guard); + + // We finished fetch all pages for this seg task, just return it for downstream + // workers. If we have met any errors, page guard will not be persisted. + return; + } + catch (const pingcap::Exception & e) + { + last_exception = std::current_exception(); + LOG_WARNING( + read_snapshot->log, + "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", + e.displayText(), + *this); + std::this_thread::sleep_for(1s); + } + catch (...) + { + LOG_ERROR(read_snapshot->log, "{}: {}", *this, getCurrentExceptionMessage(true)); + throw; + } + } + + // Still failed after retry... + RUNTIME_CHECK(last_exception); + std::rethrow_exception(last_exception); +} + +std::vector SegmentReadTask::buildRemotePageOID() const +{ + std::vector cf_tiny_oids; + cf_tiny_oids.reserve(extra_remote_info->remote_page_ids.size()); + for (const auto & page_id : extra_remote_info->remote_page_ids) + { + cf_tiny_oids.emplace_back(Remote::PageOID{ + .store_id = store_id, + .ks_table_id = {dm_context->keyspace_id, dm_context->physical_table_id}, + .page_id = page_id, + }); + } + return cf_tiny_oids; +} + +Remote::RNLocalPageCache::OccupySpaceResult SegmentReadTask::blockingOccupySpaceForTask() const +{ + auto cf_tiny_oids = buildRemotePageOID(); + // Note: We must occupySpace segment by segment, because we need to read + // at least the complete data of one segment in order to drive everything forward. + // Currently we call occupySpace for each FetchPagesRequest, which is fine, + // because we send one request each seg_task. If we want to split + // FetchPagesRequest into multiples in future, then we need to change + // the moment of calling `occupySpace`. + Stopwatch w_occupy; + SCOPE_EXIT({ + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy) + .Observe(w_occupy.elapsedSeconds()); + }); + auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + auto scan_context = dm_context->scan_context; + return page_cache->occupySpace(cf_tiny_oids, extra_remote_info->remote_page_sizes, scan_context); +} + +disaggregated::FetchDisaggPagesRequest SegmentReadTask::buildFetchPagesRequest( + const std::vector & pages_not_in_cache) const +{ + disaggregated::FetchDisaggPagesRequest req; + auto meta = extra_remote_info->snapshot_id.toMeta(); + // The keyspace_id here is not vital, as we locate the table and segment by given + // snapshot_id. But it could be helpful for debugging. + auto keyspace_id = dm_context->keyspace_id; + meta.set_keyspace_id(keyspace_id); + meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); + *req.mutable_snapshot_id() = meta; + req.set_table_id(dm_context->physical_table_id); + req.set_segment_id(segment->segmentId()); + + req.mutable_page_ids()->Reserve(pages_not_in_cache.size()); + for (auto page_id : pages_not_in_cache) + req.add_page_ids(page_id.page_id); + + return req; +} + +// In order to make network and disk run parallelly, +// `doFetchPages` will receive data pages from WN, +// package these data pages into several `WritePageTask` objects +// and send them to `RNWritePageCachePool` to write into local page cache. +struct WritePageTask +{ + explicit WritePageTask(Remote::RNLocalPageCache * page_cache_) + : page_cache(page_cache_) + {} + Remote::RNLocalPageCache * page_cache; + UniversalWriteBatch wb; + std::forward_list remote_pages; // Hold the data of wb. + std::forward_list remote_page_mem_tracker_wrappers; // Hold the memory stat of remote_pages. +}; +using WritePageTaskPtr = std::unique_ptr; + +void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest & request) +{ + // No page need to be fetched. + if (request.page_ids_size() == 0) + return; + + UInt64 read_page_ns = 0; + UInt64 deserialize_page_ns = 0; + UInt64 wait_write_page_ns = 0; + + Stopwatch sw_total; + const auto * cluster = dm_context->db_context.getTMTContext().getKVCluster(); + pingcap::kv::RpcCall rpc( + cluster->rpc_client, + extra_remote_info->store_address); + grpc::ClientContext client_context; + Stopwatch sw_rpc_call; + auto stream_resp = rpc.call(&client_context, request); + read_page_ns += sw_rpc_call.elapsed(); + SCOPE_EXIT({ + // TODO: Not sure whether we really need this. Maybe RAII is already there? + stream_resp->Finish(); + }); + + // Used to verify all pages are fetched. + std::unordered_set remaining_pages_to_fetch(request.page_ids().begin(), request.page_ids().end()); + + UInt64 packet_count = 0; + UInt64 write_page_task_count = 0; + const UInt64 page_count = request.page_ids_size(); + + auto schedule_write_page_task = [&write_page_task_count, &wait_write_page_ns](WritePageTaskPtr && write_page_task) { + write_page_task_count += 1; + auto task = std::make_shared>([write_page_task = std::move(write_page_task)]() { + write_page_task->page_cache->write(std::move(write_page_task->wb)); + }); + Stopwatch sw; + RNWritePageCachePool::get().scheduleOrThrowOnError([task]() { (*task)(); }); + wait_write_page_ns += sw.elapsed(); + return task->get_future(); + }; + + WritePageTaskPtr write_page_task; + std::vector> write_page_results; + + // Keep reading packets. + while (true) + { + Stopwatch sw_read_packet; + auto packet = std::make_shared(); + if (!stream_resp->Read(packet.get())) + break; + if (packet->has_error()) + throw Exception(ErrorCodes::FETCH_PAGES_ERROR, "{} (from {})", packet->error().msg(), *this); + + read_page_ns = sw_read_packet.elapsed(); + packet_count += 1; + MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get()); + + std::vector received_page_ids; + received_page_ids.reserve(packet->pages_size()); + for (const auto & page : packet->pages()) + { + Stopwatch sw; + if (write_page_task == nullptr) + { + write_page_task = std::make_unique( + dm_context->db_context.getSharedContextDisagg()->rn_page_cache.get()); + } + auto & remote_page = write_page_task->remote_pages.emplace_front(); // NOLINT(bugprone-use-after-move) + bool parsed = remote_page.ParseFromString(page); + RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", *this); + write_page_task->remote_page_mem_tracker_wrappers.emplace_front( + remote_page.SpaceUsedLong(), + fetch_pages_mem_tracker.get()); + + RUNTIME_CHECK( + remaining_pages_to_fetch.contains(remote_page.page_id()), + remaining_pages_to_fetch, + remote_page.page_id()); + + received_page_ids.emplace_back(remote_page.page_id()); + remaining_pages_to_fetch.erase(remote_page.page_id()); + + // Write page into LocalPageCache. Note that the page must be occupied. + auto oid = Remote::PageOID{ + .store_id = store_id, + .ks_table_id = {dm_context->keyspace_id, dm_context->physical_table_id}, + .page_id = remote_page.page_id(), + }; + auto read_buffer + = std::make_shared(remote_page.data().data(), remote_page.data().size()); + PageFieldSizes field_sizes; + field_sizes.reserve(remote_page.field_sizes_size()); + for (const auto & field_sz : remote_page.field_sizes()) + { + field_sizes.emplace_back(field_sz); + } + deserialize_page_ns += sw.elapsed(); + + auto page_id = Remote::RNLocalPageCache::buildCacheId(oid); + write_page_task->wb + .putPage(page_id, 0, std::move(read_buffer), remote_page.data().size(), std::move(field_sizes)); + auto write_batch_limit_size = dm_context->db_context.getSettingsRef().dt_write_page_cache_limit_size; + if (write_page_task->wb.getTotalDataSize() >= write_batch_limit_size) + { + write_page_results.push_back( + schedule_write_page_task(std::move(write_page_task))); // write_page_task is moved and reset. + } + } + } + + if (write_page_task != nullptr && write_page_task->wb.getTotalDataSize() > 0) + { + write_page_results.push_back(schedule_write_page_task(std::move(write_page_task))); + } + + Stopwatch sw_wait_write_page_finished; + for (auto & f : write_page_results) + { + f.get(); + } + wait_write_page_ns += sw_wait_write_page_finished.elapsed(); + + // Verify all pending pages are now received. + RUNTIME_CHECK_MSG( + remaining_pages_to_fetch.empty(), + "Failed to fetch all pages (from {}), remaining_pages_to_fetch={}", + *this, + remaining_pages_to_fetch); + + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page) + .Observe(read_page_ns / 1000000000.0); + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_deserialize_page) + .Observe(deserialize_page_ns / 1000000000.0); + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_write_page_cache) + .Observe(wait_write_page_ns / 1000000000.0); + + LOG_DEBUG( + read_snapshot->log, + "Finished fetch pages, seg_task={}, page_count={}, packet_count={}, write_page_task_count={}, " + "total_ms={}, read_stream_ms={}, deserialize_page_ms={}, schedule_write_page_ms={}", + *this, + page_count, + packet_count, + write_page_task_count, + sw_total.elapsed() / 1000000, + read_page_ns / 1000000, + deserialize_page_ns / 1000000, + wait_write_page_ns / 1000000); } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index dd7a79a5bfd..ed407e462d8 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include @@ -51,6 +51,7 @@ struct ExtraRemoteSegmentInfo struct SegmentReadTask { +public: const StoreID store_id; SegmentPtr segment; // Contains segment_id, segment_epoch SegmentSnapshotPtr read_snapshot; @@ -59,11 +60,9 @@ struct SegmentReadTask std::optional extra_remote_info; - BlockInputStreamPtr input_stream; - // Constructor for op-mode. SegmentReadTask( - const SegmentPtr & segment_, // + const SegmentPtr & segment_, const SegmentSnapshotPtr & read_snapshot_, const DMContextPtr & dm_context_, const RowKeyRanges & ranges_ = {}); @@ -82,21 +81,32 @@ struct SegmentReadTask ~SegmentReadTask(); + GlobalSegmentID getGlobalSegmentID() const + { + return GlobalSegmentID{ + .store_id = store_id, + .keyspace_id = dm_context->keyspace_id, + .physical_table_id = dm_context->physical_table_id, + .segment_id = segment->segmentId(), + .segment_epoch = segment->segmentEpoch(), + }; + } + void addRange(const RowKeyRange & range); void mergeRanges(); static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size); - /// Called from RNWorkerFetchPages. - void initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard); + void fetchPages(); - /// Called from RNWorkerPrepareStreams. void initInputStream( const ColumnDefines & columns_to_read, UInt64 read_tso, const PushDownFilterPtr & push_down_filter, - ReadMode read_mode); + ReadMode read_mode, + size_t expected_block_size, + bool enable_delta_index_error_fallback); BlockInputStreamPtr getInputStream() const { @@ -104,16 +114,38 @@ struct SegmentReadTask return input_stream; } - GlobalSegmentID getGlobalSegmentID() const - { - return GlobalSegmentID{ - .store_id = store_id, - .keyspace_id = dm_context->keyspace_id, - .physical_table_id = dm_context->physical_table_id, - .segment_id = segment->segmentId(), - .segment_epoch = segment->segmentEpoch(), - }; - } +#ifndef DBMS_PUBLIC_GTEST +private: +#else +public: +#endif + std::vector buildRemotePageOID() const; + + Remote::RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask() const; + + disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( + const std::vector & pages_not_in_cache) const; + + void doFetchPages(const disaggregated::FetchDisaggPagesRequest & request); + + void initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard); + + bool doInitInputStreamWithErrorFallback( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode, + size_t expected_block_size, + bool enable_delta_index_error_fallback); + + void doInitInputStream( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode, + size_t expected_block_size); + + BlockInputStreamPtr input_stream; }; // Used in SegmentReadTaskScheduler, SegmentReadTaskPool. @@ -122,22 +154,34 @@ using MergingSegments = std::unordered_map> } // namespace DB::DM template <> -struct fmt::formatter +struct fmt::formatter { static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } template - auto format(const DB::DM::SegmentReadTaskPtr & t, FormatContext & ctx) const + auto format(const DB::DM::SegmentReadTask & t, FormatContext & ctx) const { return fmt::format_to( ctx.out(), "s{}_ks{}_t{}_{}_{}_{}", - t->store_id, - t->dm_context->keyspace_id, - t->dm_context->physical_table_id, - t->segment->segmentId(), - t->segment->segmentEpoch(), - t->read_snapshot->delta->getDeltaIndexEpoch()); + t.store_id, + t.dm_context->keyspace_id, + t.dm_context->physical_table_id, + t.segment->segmentId(), + t.segment->segmentEpoch(), + t.read_snapshot->delta->getDeltaIndexEpoch()); + } +}; + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::DM::SegmentReadTaskPtr & t, FormatContext & ctx) const + { + return fmt::formatter().format(*t, ctx); } }; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index a2039fb788d..57c70cf0161 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -84,30 +84,28 @@ bool SegmentReadTasksWrapper::empty() const BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { + // Call fetchPages before MemoryTrackerSetter, because its memory usage is tracked by `fetch_pages_mem_tracker`. + t->fetchPages(); + MemoryTrackerSetter setter(true, mem_tracker.get()); - BlockInputStreamPtr stream; - auto block_size = std::max( - expected_block_size, - static_cast(t->dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - stream = t->segment->getInputStream( - read_mode, - *(t->dm_context), + + t->initInputStream( columns_to_read, - t->read_snapshot, - t->ranges, - filter, max_version, - block_size); - stream = std::make_shared( - stream, + filter, + read_mode, + expected_block_size, + t->dm_context->db_context.getSettingsRef().dt_enable_delta_index_error_fallback); + BlockInputStreamPtr stream = std::make_shared( + t->getInputStream(), extra_table_id_index, t->dm_context->physical_table_id); LOG_DEBUG( log, - "getInputStream succ, read_mode={}, pool_id={} segment_id={}", + "buildInputStream: read_mode={}, pool_id={} segment={}", magic_enum::enum_name(read_mode), pool_id, - t->segment->segmentId()); + t); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 013e161b0f3..b3815a3945b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -89,6 +89,30 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic return path_delegate.listPaths(); } + std::pair> genDMFile(DMContext & context, const Block & block) + { + auto input_stream = std::make_shared(block); + auto [store_path, file_id] = store->preAllocateIngestFile(); + + auto dmfile = writeIntoNewDMFile( + context, + std::make_shared(store->getTableColumns()), + input_stream, + file_id, + store_path); + + store->preIngestFile(store_path, file_id, dmfile->getBytesOnDisk()); + + const auto & pk_column = block.getByPosition(0).column; + auto min_pk = pk_column->getInt(0); + auto max_pk = pk_column->getInt(block.rows() - 1); + HandleRange range(min_pk, max_pk + 1); + auto handle_range = RowKeyRange::fromHandleRange(range); + auto external_file = ExternalDTFileInfo{.id = file_id, .range = handle_range}; + // There are some duplicated info. This is to minimize the change to our test code. + return {handle_range, {external_file}}; + } + protected: DeltaMergeStorePtr store; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index e783b103408..89dc764006a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -550,102 +550,6 @@ try } CATCH - -TEST_F(SegmentOperationTest, DeltaIndexError) -try -{ - // write stable - writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10000, 0); - mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - // split into 2 segment - auto segment_id = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); - ASSERT_TRUE(segment_id.has_value()); - // write delta - writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 1000, 0); - writeSegment(*segment_id, 1000, 8000); - - // Init delta index - { - auto [first, first_snap] = getSegmentForRead(DELTA_MERGE_FIRST_SEGMENT_ID); - first->placeDeltaIndex(*dm_context, first_snap); - } - auto [first, first_snap] = getSegmentForRead(DELTA_MERGE_FIRST_SEGMENT_ID); - LOG_DEBUG(log, "First: {}", first_snap->delta->getSharedDeltaIndex()->toString()); - - { - auto [second, second_snap] = getSegmentForRead(*segment_id); - second->placeDeltaIndex(*dm_context, second_snap); - } - auto [second, second_snap] = getSegmentForRead(*segment_id); - LOG_DEBUG(log, "Second: {}", second_snap->delta->getSharedDeltaIndex()->toString()); - - // Create a wrong delta index for first segment. - auto [placed_rows, placed_deletes] = first_snap->delta->getSharedDeltaIndex()->getPlacedStatus(); - auto broken_delta_index = std::make_shared( - second_snap->delta->getSharedDeltaIndex()->getDeltaTree(), - placed_rows, - placed_deletes, - first_snap->delta->getSharedDeltaIndex()->getRNCacheKey()); - first_snap->delta->shared_delta_index = broken_delta_index; - - auto task = std::make_shared( - first, - first_snap, - createDMContext(), - RowKeyRanges{first->getRowKeyRange()}); - - auto worker = DB::DM::Remote::RNWorkerPrepareStreams::create({ - .source_queue = nullptr, - .result_queue = nullptr, - .log = Logger::get(), - .concurrency = 1, - .columns_to_read = tableColumns(), - .read_tso = 0, - .push_down_filter = nullptr, - .read_mode = ReadMode::Bitmap, - }); - - ASSERT_FALSE(worker->initInputStream(task, true)); - - try - { - [[maybe_unused]] auto succ = worker->initInputStream(task, false); - FAIL() << "Should not come here."; - } - catch (const Exception & e) - { - ASSERT_EQ(e.code(), ErrorCodes::DT_DELTA_INDEX_ERROR); - } - - try - { - db_context->getSettingsRef().set("dt_enable_delta_index_error_fallback", "false"); - task = worker->testDoWork(task); - FAIL() << "Should not come here."; - } - catch (const Exception & e) - { - ASSERT_EQ(e.code(), ErrorCodes::DT_DELTA_INDEX_ERROR); - } - - db_context->getSettingsRef().set("dt_enable_delta_index_error_fallback", "true"); - task = worker->testDoWork(task); - auto stream = task->getInputStream(); - ASSERT_NE(stream, nullptr); - std::vector blks; - for (auto blk = stream->read(); blk; blk = stream->read()) - { - blks.push_back(blk); - } - auto handle_col1 = vstackBlocks(std::move(blks)).getByName(EXTRA_HANDLE_COLUMN_NAME).column; - auto handle_col2 = getSegmentHandle(task->segment->segmentId(), {task->segment->getRowKeyRange()}); - ASSERT_TRUE(sequenceEqual( - toColumnVectorDataPtr(handle_col2)->data(), - toColumnVectorDataPtr(handle_col1)->data(), - handle_col1->size())); -} -CATCH - class SegmentEnableLogicalSplitTest : public SegmentOperationTest { protected: diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp new file mode 100644 index 00000000000..548b58a169a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp @@ -0,0 +1,376 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace DB::tests; + +namespace DB::ErrorCodes +{ +extern const int DT_DELTA_INDEX_ERROR; +extern const int FETCH_PAGES_ERROR; +} // namespace DB::ErrorCodes + +namespace DB::DM::tests +{ + +class SegmentReadTaskTest : public SegmentTestBasic +{ +protected: + DB::LoggerPtr log = DB::Logger::get("SegmentReadTaskTest"); +}; + +TEST_F(SegmentReadTaskTest, InitInputStream) +try +{ + // write stable + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10000, 0); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + // split into 2 segment + auto segment_id = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(segment_id.has_value()); + // write delta + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 1000, 0); + writeSegment(*segment_id, 1000, 8000); + + // Init delta index + { + auto [first, first_snap] = getSegmentForRead(DELTA_MERGE_FIRST_SEGMENT_ID); + first->placeDeltaIndex(*dm_context, first_snap); + } + auto [first, first_snap] = getSegmentForRead(DELTA_MERGE_FIRST_SEGMENT_ID); + LOG_DEBUG(log, "First: {}", first_snap->delta->getSharedDeltaIndex()->toString()); + + { + auto [second, second_snap] = getSegmentForRead(*segment_id); + second->placeDeltaIndex(*dm_context, second_snap); + } + auto [second, second_snap] = getSegmentForRead(*segment_id); + LOG_DEBUG(log, "Second: {}", second_snap->delta->getSharedDeltaIndex()->toString()); + + // Create a wrong delta index for first segment. + auto [placed_rows, placed_deletes] = first_snap->delta->getSharedDeltaIndex()->getPlacedStatus(); + auto broken_delta_index = std::make_shared( + second_snap->delta->getSharedDeltaIndex()->getDeltaTree(), + placed_rows, + placed_deletes, + first_snap->delta->getSharedDeltaIndex()->getRNCacheKey()); + first_snap->delta->shared_delta_index = broken_delta_index; + + auto task = std::make_shared( + first, + first_snap, + createDMContext(), + RowKeyRanges{first->getRowKeyRange()}); + + const auto & column_defines = *tableColumns(); + ASSERT_FALSE(task->doInitInputStreamWithErrorFallback( + column_defines, + 0, + nullptr, + ReadMode::Bitmap, + DEFAULT_BLOCK_SIZE, + true)); + + try + { + [[maybe_unused]] auto succ = task->doInitInputStreamWithErrorFallback( + column_defines, + 0, + nullptr, + ReadMode::Bitmap, + DEFAULT_BLOCK_SIZE, + false); + FAIL() << "Should not come here."; + } + catch (const Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::DT_DELTA_INDEX_ERROR); + } + + task->initInputStream(column_defines, 0, nullptr, ReadMode::Bitmap, DEFAULT_BLOCK_SIZE, true); + auto stream = task->getInputStream(); + ASSERT_NE(stream, nullptr); + std::vector blks; + for (auto blk = stream->read(); blk; blk = stream->read()) + { + blks.push_back(blk); + } + auto handle_col1 = vstackBlocks(std::move(blks)).getByName(EXTRA_HANDLE_COLUMN_NAME).column; + auto handle_col2 = getSegmentHandle(task->segment->segmentId(), {task->segment->getRowKeyRange()}); + ASSERT_TRUE(sequenceEqual( + toColumnVectorDataPtr(handle_col2)->data(), + toColumnVectorDataPtr(handle_col1)->data(), + handle_col1->size())); +} +CATCH + + +TEST_F(DeltaMergeStoreTest, DisaggReadSnapshot) +try +{ + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + // stable + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 4096, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->mergeDeltaAll(*db_context); + } + + // cf delete range + { + HandleRange range(0, 128); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + // cf big + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + } + + // cf tiny + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + store->write(*db_context, db_context->getSettingsRef(), block); + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + + // cf mem + { + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + auto scan_context = std::make_shared(); + auto snap = store->writeNodeBuildRemoteReadSnapshot( + *db_context, + db_context->getSettingsRef(), + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + 1, + "req_id", + {}, + scan_context); + + snap->column_defines = std::make_shared(store->getTableColumns()); + + MemTrackerWrapper mem_tracker_wrapper(nullptr); + auto remote_table_pb = Remote::Serializer::serializeTo(snap, /*task_id*/ {}, mem_tracker_wrapper); + + ASSERT_GT(remote_table_pb.segments_size(), 0); + + db_context->getSharedContextDisagg()->remote_data_store + = std::make_shared(db_context->getFileProvider()); + + for (const auto & remote_seg : remote_table_pb.segments()) + { + auto seg_task = std::make_shared( + Logger::get(), + *db_context, + scan_context, + remote_seg, + DisaggTaskId{}, + /*store_id*/ 1, + /*store_address*/ "127.0.0.1", + store->keyspace_id, + store->physical_table_id); + + auto seg_id = seg_task->segment->segmentId(); + + auto itr = store->id_to_segment.find(seg_id); + ASSERT_NE(itr, store->id_to_segment.end()) << seg_id; + + auto seg = itr->second; + ASSERT_NE(seg, nullptr) << seg_id; + auto delta_wn = seg->getDelta(); + ASSERT_NE(delta_wn, nullptr) << seg_id; + auto stable_wn = seg->getStable(); + ASSERT_NE(stable_wn, nullptr) << seg_id; + + ASSERT_NE(seg_task->segment, nullptr) << seg_id; + auto delta_cn = seg->getDelta(); + ASSERT_NE(delta_cn, nullptr) << seg_id; + auto stable_cn = seg->getStable(); + ASSERT_NE(stable_cn, nullptr) << seg_id; + + // Check Delta + ASSERT_EQ(delta_wn->getDeltaIndexEpoch(), delta_cn->getDeltaIndexEpoch()); + ASSERT_EQ(delta_wn->simpleInfo(), delta_cn->simpleInfo()); + ASSERT_EQ(delta_wn->info(), delta_cn->info()); + ASSERT_EQ(delta_wn->getId(), delta_cn->getId()); + + // cf mem set + auto mem_set_wn = delta_wn->getMemTableSet(); + auto mem_set_cn = delta_cn->getMemTableSet(); + ASSERT_EQ(mem_set_wn->getColumnFileCount(), 1); + ASSERT_EQ(mem_set_cn->getColumnFileCount(), 1); + for (size_t i = 0; i < mem_set_wn->getColumnFileCount(); i++) + { + auto * cf_wn = mem_set_wn->column_files[i]->tryToInMemoryFile(); + ASSERT_NE(cf_wn, nullptr); + auto * cf_cn = mem_set_cn->column_files[i]->tryToInMemoryFile(); + ASSERT_NE(cf_cn, nullptr); + ASSERT_EQ(cf_wn->toString(), cf_cn->toString()); + String msg; + ASSERT_TRUE(blockEqual(cf_wn->getCache()->block, cf_cn->getCache()->block, msg)); + } + + auto check_dmfile = [](const DMFilePtr & dmfile_wn, const DMFilePtr & dmfile_cn) { + ASSERT_EQ(dmfile_wn->file_id, dmfile_cn->file_id); + ASSERT_EQ(dmfile_wn->page_id, dmfile_cn->page_id); + ASSERT_EQ(dmfile_wn->parent_path, dmfile_cn->parent_path); + ASSERT_EQ(dmfile_wn->status, dmfile_cn->status); + ASSERT_EQ(dmfile_wn->version, dmfile_cn->version); + + ASSERT_TRUE(dmfile_wn->configuration.has_value()); + ASSERT_TRUE(dmfile_cn->configuration.has_value()); + ASSERT_EQ( + dmfile_wn->configuration->getChecksumFrameLength(), + dmfile_cn->configuration->getChecksumFrameLength()); + ASSERT_EQ( + dmfile_wn->configuration->getChecksumHeaderLength(), + dmfile_cn->configuration->getChecksumHeaderLength()); + ASSERT_EQ( + dmfile_wn->configuration->getChecksumAlgorithm(), + dmfile_cn->configuration->getChecksumAlgorithm()); + ASSERT_EQ(dmfile_wn->configuration->getEmbeddedChecksum(), dmfile_cn->configuration->getEmbeddedChecksum()); + ASSERT_EQ(dmfile_wn->configuration->getDebugInfo(), dmfile_cn->configuration->getDebugInfo()); + + ASSERT_EQ(dmfile_wn->pack_stats.size(), dmfile_cn->pack_stats.size()); + for (size_t j = 0; j < dmfile_wn->pack_stats.size(); j++) + { + ASSERT_EQ(dmfile_wn->pack_stats[j].toDebugString(), dmfile_cn->pack_stats[j].toDebugString()); + } + + ASSERT_EQ(dmfile_wn->pack_properties.property_size(), dmfile_cn->pack_properties.property_size()); + for (int j = 0; j < dmfile_wn->pack_properties.property_size(); j++) + { + ASSERT_EQ( + dmfile_wn->pack_properties.property(j).ShortDebugString(), + dmfile_cn->pack_properties.property(j).ShortDebugString()); + } + + ASSERT_EQ(dmfile_wn->column_stats.size(), dmfile_cn->column_stats.size()); + for (const auto & [col_id, col_stat_wn] : dmfile_wn->column_stats) + { + auto itr = dmfile_cn->column_stats.find(col_id); + ASSERT_NE(itr, dmfile_cn->column_stats.end()); + const auto & col_stat_cn = itr->second; + WriteBufferFromOwnString wb_wn; + col_stat_wn.serializeToBuffer(wb_wn); + WriteBufferFromOwnString wb_cn; + col_stat_cn.serializeToBuffer(wb_cn); + ASSERT_EQ(wb_wn.str(), wb_cn.str()); + } + + ASSERT_EQ(dmfile_wn->column_indices, dmfile_cn->column_indices); + + ASSERT_EQ(dmfile_wn->merged_files.size(), dmfile_cn->merged_files.size()); + for (size_t j = 0; j < dmfile_wn->merged_files.size(); j++) + { + const auto & merged_file_wn = dmfile_wn->merged_files[j]; + const auto & merged_file_cn = dmfile_cn->merged_files[j]; + ASSERT_EQ(merged_file_wn.number, merged_file_cn.number); + ASSERT_EQ(merged_file_wn.size, merged_file_cn.size); + } + + ASSERT_EQ(dmfile_wn->merged_sub_file_infos.size(), dmfile_cn->merged_sub_file_infos.size()); + for (const auto & [fname, sub_files_wn] : dmfile_wn->merged_sub_file_infos) + { + auto itr = dmfile_cn->merged_sub_file_infos.find(fname); + ASSERT_NE(itr, dmfile_cn->merged_sub_file_infos.end()); + const auto & sub_files_cn = itr->second; + WriteBufferFromOwnString wb_wn; + sub_files_wn.serializeToBuffer(wb_wn); + WriteBufferFromOwnString wb_cn; + sub_files_cn.serializeToBuffer(wb_cn); + ASSERT_EQ(wb_wn.str(), wb_cn.str()); + } + }; + + // cf persist set + auto persist_set_wn = delta_wn->getPersistedFileSet(); + auto persist_set_cn = delta_cn->getPersistedFileSet(); + ASSERT_EQ(persist_set_wn->getColumnFileCount(), 4); + ASSERT_EQ(persist_set_cn->getColumnFileCount(), 4); + ASSERT_EQ(persist_set_wn->detailInfo(), persist_set_cn->detailInfo()); + for (size_t i = 0; i < persist_set_wn->getColumnFileCount(); i++) + { + auto cf_wn = persist_set_wn->getFiles()[i]; + auto cf_cn = persist_set_cn->getFiles()[i]; + + if (i == 0) + { + auto * cf_del_wn = cf_wn->tryToDeleteRange(); + ASSERT_NE(cf_del_wn, nullptr); + auto * cf_del_cn = cf_cn->tryToDeleteRange(); + ASSERT_NE(cf_del_cn, nullptr); + ASSERT_EQ(cf_del_wn->getDeleteRange(), cf_del_cn->getDeleteRange()); + } + else if (i == 1) + { + auto * cf_big_wn = cf_wn->tryToBigFile(); + ASSERT_NE(cf_big_wn, nullptr); + auto * cf_big_cn = cf_cn->tryToBigFile(); + ASSERT_NE(cf_big_cn, nullptr); + check_dmfile(cf_big_wn->getFile(), cf_big_cn->getFile()); + } + else + { + auto * cf_tiny_wn = cf_wn->tryToTinyFile(); + ASSERT_NE(cf_tiny_wn, nullptr); + auto * cf_tiny_cn = cf_cn->tryToTinyFile(); + ASSERT_NE(cf_tiny_cn, nullptr); + ASSERT_EQ(cf_tiny_wn->getDataPageId(), cf_tiny_cn->getDataPageId()); + ASSERT_EQ(cf_tiny_wn->getDataPageSize(), cf_tiny_cn->getDataPageSize()); + } + } + + // Check Stable + ASSERT_EQ(stable_wn->getId(), stable_cn->getId()); + ASSERT_EQ(stable_wn->getRows(), stable_cn->getRows()); + ASSERT_EQ(stable_wn->getBytes(), stable_cn->getBytes()); + ASSERT_EQ(stable_wn->getDMFiles().size(), 1); + ASSERT_EQ(stable_cn->getDMFiles().size(), 1); + for (size_t i = 0; i < stable_wn->getDMFiles().size(); i++) + { + const auto & dmfile_wn = stable_wn->getDMFiles()[i]; + const auto & dmfile_cn = stable_cn->getDMFiles()[i]; + check_dmfile(dmfile_wn, dmfile_cn); + } + } +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 95d1a66b0b1..6abe226a5f2 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -107,11 +108,12 @@ class StorageDisaggregated : public IStorage const Context & db_context, const pingcap::coprocessor::BatchCopTask & batch_cop_task); DM::RSOperatorPtr buildRSOperator(const Context & db_context, const DM::ColumnDefinesPtr & columns_to_read); - DM::Remote::RNWorkersPtr buildRNWorkers( + std::variant packSegmentReadTasks( const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, - size_t num_streams); + size_t num_streams, + int extra_table_id_index); void buildRemoteSegmentInputStreams( const Context & db_context, DM::SegmentReadTasks && read_tasks, diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 64e104db1d1..1af797602d5 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -30,11 +30,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -62,6 +64,7 @@ #include #include #include +#include namespace DB { @@ -484,11 +487,12 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator( return rs_operator; } -DM::Remote::RNWorkersPtr StorageDisaggregated::buildRNWorkers( +std::variant StorageDisaggregated::packSegmentReadTasks( const Context & db_context, DM::SegmentReadTasks && read_tasks, const DM::ColumnDefinesPtr & column_defines, - size_t num_streams) + size_t num_streams, + int extra_table_id_index) { const auto & executor_id = table_scan.getTableScanExecutorID(); @@ -506,10 +510,12 @@ DM::Remote::RNWorkersPtr StorageDisaggregated::buildRNWorkers( table_scan.keepOrder(), push_down_filter); const UInt64 read_tso = sender_target_mpp_task_id.gather_id.query_id.start_ts; + const auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread; LOG_INFO( log, - "Building segment input streams, read_mode={} is_fast_scan={} keep_order={} segments={} num_streams={} " - "column_defines={}", + "packSegmentReadTasks: enable_read_thread={} read_mode={} is_fast_scan={} keep_order={} task_count={} " + "num_streams={} column_defines={}", + enable_read_thread, magic_enum::enum_name(read_mode), table_scan.isFastScan(), table_scan.keepOrder(), @@ -517,47 +523,88 @@ DM::Remote::RNWorkersPtr StorageDisaggregated::buildRNWorkers( num_streams, *column_defines); - return DM::Remote::RNWorkers::create( - db_context, - std::move(read_tasks), - { - .log = log->getChild(executor_id), - .columns_to_read = column_defines, - .read_tso = read_tso, - .push_down_filter = push_down_filter, - .read_mode = read_mode, - }, - num_streams); + if (enable_read_thread) + { + return std::make_shared( + extra_table_id_index, + *column_defines, + push_down_filter, + read_tso, + db_context.getSettingsRef().max_block_size, + read_mode, + std::move(read_tasks), + /*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {}, + executor_id, + /*enable_read_thread*/ true, + num_streams); + } + else + { + return DM::Remote::RNWorkers::create( + db_context, + std::move(read_tasks), + { + .log = log->getChild(executor_id), + .columns_to_read = column_defines, + .read_tso = read_tso, + .push_down_filter = push_down_filter, + .read_mode = read_mode, + }, + num_streams); + } } +struct InputStreamBuilder +{ + const String & tracing_id; + const DM::ColumnDefinesPtr & columns_to_read; + int extra_table_id_index; + + BlockInputStreamPtr operator()(DM::Remote::RNWorkersPtr & workers) const + { + return DM::Remote::RNSegmentInputStream::create(DM::Remote::RNSegmentInputStream::Options{ + .debug_tag = tracing_id, + .workers = workers, + .columns_to_read = *columns_to_read, + .extra_table_id_index = extra_table_id_index, + }); + } + + BlockInputStreamPtr operator()(DM::SegmentReadTaskPoolPtr & read_tasks) const + { + return std::make_shared( + read_tasks, + *columns_to_read, + extra_table_id_index, + tracing_id); + } +}; + + void StorageDisaggregated::buildRemoteSegmentInputStreams( const Context & db_context, DM::SegmentReadTasks && read_tasks, size_t num_streams, DAGPipeline & pipeline) { - const auto & executor_id = table_scan.getTableScanExecutorID(); - // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto workers = buildRNWorkers(db_context, std::move(read_tasks), column_defines, num_streams); - + auto packed_read_tasks + = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); pipeline.streams.reserve(num_streams); + + InputStreamBuilder builder{ + .tracing_id = log->identifier(), + .columns_to_read = column_defines, + .extra_table_id_index = extra_table_id_index, + }; for (size_t stream_idx = 0; stream_idx < num_streams; ++stream_idx) { - auto stream = DM::Remote::RNSegmentInputStream::create({ - .debug_tag = log->identifier(), - // Note: We intentionally pass the whole worker, instead of worker->getReadyChannel() - // because we want to extend the lifetime of the WorkerPtr until read is finished. - // Also, we want to start the Worker after the read. - .workers = workers, - .columns_to_read = *column_defines, - .extra_table_id_index = extra_table_id_index, - }); - pipeline.streams.emplace_back(stream); + pipeline.streams.emplace_back(std::visit(builder, packed_read_tasks)); } + const auto & executor_id = table_scan.getTableScanExecutorID(); auto * dag_context = db_context.getDAGContext(); auto & table_scan_io_input_streams = dag_context->getInBoundIOInputStreamsMap()[executor_id]; auto & profile_streams = dag_context->getProfileStreamsMap()[executor_id]; @@ -567,6 +614,35 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( }); } +struct SrouceOpBuilder +{ + const String & tracing_id; + const DM::ColumnDefinesPtr & column_defines; + int extra_table_id_index; + PipelineExecutorContext & exec_context; + + SourceOpPtr operator()(DM::Remote::RNWorkersPtr & workers) const + { + return DM::Remote::RNSegmentSourceOp::create({ + .debug_tag = tracing_id, + .exec_context = exec_context, + .workers = workers, + .columns_to_read = *column_defines, + .extra_table_id_index = extra_table_id_index, + }); + } + + SourceOpPtr operator()(DM::SegmentReadTaskPoolPtr & read_tasks) const + { + return std::make_unique( + exec_context, + read_tasks, + *column_defines, + extra_table_id_index, + tracing_id); + } +}; + void StorageDisaggregated::buildRemoteSegmentSourceOps( PipelineExecutorContext & exec_context, PipelineExecGroupBuilder & group_builder, @@ -576,21 +652,19 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps( { // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto workers = buildRNWorkers(db_context, std::move(read_tasks), column_defines, num_streams); + auto packed_read_tasks + = packSegmentReadTasks(db_context, std::move(read_tasks), column_defines, num_streams, extra_table_id_index); RUNTIME_CHECK(num_streams > 0, num_streams); + SrouceOpBuilder builder{ + .tracing_id = log->identifier(), + .column_defines = column_defines, + .extra_table_id_index = extra_table_id_index, + .exec_context = exec_context, + }; for (size_t i = 0; i < num_streams; ++i) { - group_builder.addConcurrency(DM::Remote::RNSegmentSourceOp::create({ - .debug_tag = log->identifier(), - .exec_context = exec_context, - // Note: We intentionally pass the whole worker, instead of worker->getReadyChannel() - // because we want to extend the lifetime of the WorkerPtr until read is finished. - // Also, we want to start the Worker after the read. - .workers = workers, - .columns_to_read = *column_defines, - .extra_table_id_index = extra_table_id_index, - })); + group_builder.addConcurrency(std::visit(builder, packed_read_tasks)); } db_context.getDAGContext()->addInboundIOProfileInfos( table_scan.getTableScanExecutorID(),