From 6a873b4f1cab9f85af913b38b1b62660ef31264d Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 4 Jun 2024 17:15:21 +0300 Subject: [PATCH] feat(tiering): Simple snapshotting (#3073) * feat(tiering): Simple snapshotting --------- Signed-off-by: Vladislav Oleshko --- src/server/CMakeLists.txt | 18 ++++++++----- src/server/engine_shard_set.cc | 3 --- src/server/engine_shard_set.h | 4 --- src/server/server_family.cc | 8 ------ src/server/snapshot.cc | 25 ++++++++++++++--- src/server/snapshot.h | 10 +++++++ src/server/tiered_storage.cc | 12 ++++++++- src/server/tiering/external_alloc.cc | 2 +- src/server/tiering/op_manager.cc | 12 +++++---- src/server/tiering/small_bins.cc | 7 ++++- tests/dragonfly/snapshot_test.py | 40 ++++++++++++++++++++++++++++ 11 files changed, 109 insertions(+), 32 deletions(-) diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 833942dbe4e8..eeb65ebd4c43 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -14,12 +14,11 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR}) if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") - SET(TX_LINUX_SRCS tiered_storage.cc tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc + SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc tiering/io_mgr.cc tiering/external_alloc.cc) add_executable(dfly_bench dfly_bench.cc) cxx_link(dfly_bench dfly_facade fibers2 absl::random_random) - cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY) cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY) @@ -36,13 +35,18 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc ${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc ) -SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessors.cc +SET(DF_SEARCH_SRCS search/search_family.cc search/doc_index.cc search/doc_accessors.cc search/aggregator.cc) +if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") + SET(DF_LINUX_SRCS tiered_storage.cc) + + cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) +endif() + add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc generic_family.cc hset_family.cc http_api.cc json_family.cc - ${SEARCH_FILES} list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc protocol_client.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc @@ -50,8 +54,10 @@ add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc detail/snapshot_storage.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc - top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc - cluster/cluster_family.cc cluster/incoming_slot_migration.cc + top_keys.cc multi_command_squasher.cc hll_family.cc + ${DF_SEARCH_SRCS} + ${DF_LINUX_SRCS} + cluster/cluster_config.cc cluster/cluster_family.cc cluster/incoming_slot_migration.cc cluster/outgoing_slot_migration.cc cluster/cluster_defs.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 6bb8f8cd0219..05d278a6c427 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -847,9 +847,6 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) { shard_queue_.resize(sz); size_t max_shard_file_size = GetTieredFileLimit(sz); - if (max_shard_file_size > 0) - is_tiering_enabled_ = true; - pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { if (index < shard_queue_.size()) { InitThreadLocal(pb, update_db_time, max_shard_file_size); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 70f87b0d0c79..6f209d126050 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -282,9 +282,6 @@ class EngineShardSet { return uint32_t(shard_queue_.size()); } - bool IsTieringEnabled() { - return is_tiering_enabled_; - } util::ProactorPool* pool() { return pp_; } @@ -347,7 +344,6 @@ class EngineShardSet { util::ProactorPool* pp_; std::vector shard_queue_; - bool is_tiering_enabled_ = false; }; template diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 78067a505f91..37a18780ffff 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -950,10 +950,6 @@ void ServerFamily::SnapshotScheduling() { if (!cron_expr) { return; } - if (shard_set->IsTieringEnabled()) { - LOG(ERROR) << "Snapshot not allowed when using tiering. Exiting.."; - exit(1); - } const auto loading_check_interval = std::chrono::seconds(10); while (service_.GetGlobalState() == GlobalState::LOADING) { @@ -1399,10 +1395,6 @@ GenericError ServerFamily::DoSave(bool ignore_state) { GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans, bool ignore_state) { - if (shard_set->IsTieringEnabled()) { - return GenericError{make_error_code(errc::operation_not_permitted), - StrCat("Can not save database in tiering mode")}; - } auto state = service_.GetGlobalState(); // In some cases we want to create a snapshot even if server is not active, f.e in takeover if (!ignore_state && (state != GlobalState::ACTIVE)) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index b46aed03ad7c..08a9ee8b86f8 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -15,6 +15,7 @@ #include "server/journal/journal.h" #include "server/rdb_extensions.h" #include "server/rdb_save.h" +#include "server/tiered_storage.h" namespace dfly { @@ -284,12 +285,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr expire_time = db_slice_->ExpireTime(eit); } - io::Result res = serializer->SaveEntry(pk, pv, expire_time, db_indx); - CHECK(res); - ++type_freq_map_[*res]; + if (pv.IsExternal()) { + // We can't block, so we just schedule a tiered read and append it to the delayed entries + util::fb2::Future future; + EngineShard::tlocal()->tiered_storage()->Read( + db_indx, pk.ToString(), pv, + [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); + delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time}); + ++type_freq_map_[RDB_TYPE_STRING]; + } else { + io::Result res = serializer->SaveEntry(pk, pv, expire_time, db_indx); + CHECK(res); + ++type_freq_map_[*res]; + } } bool SliceSnapshot::PushSerializedToChannel(bool force) { + // Bucket serialization might have accumulated some delayed values. + // Because we can finally block in this function, we'll await and serialize them + while (!delayed_entries_.empty()) { + auto& entry = delayed_entries_.back(); + serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid); + delayed_entries_.pop_back(); + } + if (!force && serializer_->SerializedLen() < 4096) return false; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 9bfaf1fdf591..da7962d2e3c1 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -13,6 +13,7 @@ #include "server/db_slice.h" #include "server/rdb_save.h" #include "server/table.h" +#include "util/fibers/future.h" namespace dfly { @@ -132,6 +133,14 @@ class SliceSnapshot { RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; private: + // An entry whose value must be awaited + struct DelayedEntry { + DbIndex dbid; + CompactObj key; + util::fb2::Future value; + time_t expire; + }; + DbSlice* db_slice_; DbTableArray db_array_; @@ -141,6 +150,7 @@ class SliceSnapshot { DbIndex current_db_; std::unique_ptr serializer_; + std::vector delayed_entries_; // collected during atomic bucket traversal // Used for sanity checks. bool serialize_bucket_running_ = false; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 2936a2e29339..63e614fb913f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -18,6 +18,7 @@ #include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" +#include "server/snapshot.h" #include "server/table.h" #include "server/tiering/common.h" #include "server/tiering/op_manager.h" @@ -129,6 +130,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Load all values from bin by their hashes void Defragment(tiering::DiskSegment segment, string_view value) { + // Note: Bin could've already been deleted, in that case DeleteBin returns an empty list for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) { // Search for key with the same hash and value pointing to the same segment. // If it still exists, it must correspond to the value stored in this bin @@ -166,6 +168,9 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { if (!modified && !cache_fetched_) return false; + if (SliceSnapshot::IsSnaphotInProgress()) + return false; + SetInMemory(get(id), value, segment); return true; } @@ -175,8 +180,9 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { return true; auto bin = ts_->bins_->Delete(segment); - if (bin.empty) + if (bin.empty) { return true; + } if (bin.fragmented) { // Trigger read to signal need for defragmentation. ReportFetched will handle it. @@ -286,6 +292,7 @@ void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) { visit([this](auto id) { op_manager_->ClearIoPending(id); }, id); } } + void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { DCHECK(value->IsExternal()); tiering::DiskSegment segment = value->GetExternalSlice(); @@ -337,6 +344,9 @@ TieredStats TieredStorage::GetStats() const { } void TieredStorage::RunOffloading(DbIndex dbid) { + if (SliceSnapshot::IsSnaphotInProgress()) + return; + PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime; int stash_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth) - op_manager_->GetStats().pending_stash_cnt; diff --git a/src/server/tiering/external_alloc.cc b/src/server/tiering/external_alloc.cc index 418160775a25..32b06d72d347 100644 --- a/src/server/tiering/external_alloc.cc +++ b/src/server/tiering/external_alloc.cc @@ -378,7 +378,7 @@ void ExternalAllocator::Free(size_t offset, size_t sz) { CHECK_LE(sz, block_size); DCHECK_LT(block_id, blocks_num); - DCHECK(!page->free_blocks[block_id]); + DCHECK(!page->free_blocks[block_id]) << offset; page->free_blocks.set(block_id); ++page->available; diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index b9603874907c..18374f588fc6 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -11,7 +11,6 @@ #include "io/io.h" #include "server/tiering/common.h" #include "server/tiering/disk_storage.h" - namespace dfly::tiering { namespace { @@ -59,12 +58,14 @@ void OpManager::Delete(EntryId id) { void OpManager::Delete(DiskSegment segment) { EntryOps* pending_op = nullptr; - if (auto it = pending_reads_.find(segment.ContainingPages().offset); it != pending_reads_.end()) - pending_op = it->second.Find(segment); + + auto base_it = pending_reads_.find(segment.ContainingPages().offset); + if (base_it != pending_reads_.end()) + pending_op = base_it->second.Find(segment); if (pending_op) { pending_op->deleting = true; - } else if (ReportDelete(segment)) { + } else if (ReportDelete(segment) && base_it == pending_reads_.end()) { storage_.MarkAsFree(segment.ContainingPages()); } } @@ -143,8 +144,9 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { deleting_full |= ReportDelete(ko.segment); } - if (deleting_full) + if (deleting_full) { storage_.MarkAsFree(info->segment); + } pending_reads_.erase(offset); } diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index d8700886ebff..84d7726be668 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -146,7 +146,12 @@ SmallBins::Stats SmallBins::GetStats() const { SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { DCHECK_EQ(value.size(), kPageSize); - stats_.stashed_entries_cnt -= stashed_bins_.extract(segment.offset).mapped().entries; + + auto bin = stashed_bins_.extract(segment.offset); + if (bin.empty()) + return {}; + + stats_.stashed_entries_cnt -= bin.mapped().entries; const char* data = value.data(); diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index e972bf10c6f0..5622fac4d4dc 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -404,3 +404,43 @@ async def test_bgsave_and_save(async_client: aioredis.Redis): while await is_saving(async_client): await asyncio.sleep(0.1) await async_client.execute_command("SAVE") + + +@dfly_args( + { + **BASIC_ARGS, + "proactor_threads": 4, + "dbfilename": "tiered-entries", + "tiered_prefix": "/tmp/tiering_test_backing", + "tiered_offload_threshold": "0.0", # ask offloading loop to offload as much as possible + } +) +async def test_tiered_entries(async_client: aioredis.Redis): + """This test makes sure tieried entries are correctly persisted""" + + # With variance 4: 512 - 8192 we include small and large values + await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run( + async_client + ) + + # Compute the capture, this brings all items back to memory... so we'll wait for offloading + start_capture = await StaticSeeder.capture(async_client) + + # Wait until the total_stashes counter stops increasing, meaning offloading finished + last_writes, current_writes = 0, -1 + while last_writes != current_writes: + await asyncio.sleep(0.1) + last_writes = current_writes + current_writes = (await async_client.info("TIERED"))["tiered_total_stashes"] + + # Save + flush + load + await async_client.execute_command("SAVE", "DF") + assert await async_client.flushall() + await async_client.execute_command( + "DEBUG", + "LOAD", + "tiered-entries-summary.dfs", + ) + + # Compare captures + assert await StaticSeeder.capture(async_client) == start_capture