Skip to content

Commit

Permalink
feat(tiering): Simple snapshotting (#3073)
Browse files Browse the repository at this point in the history
* feat(tiering): Simple snapshotting

---------

Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Jun 4, 2024
1 parent b02521c commit 6a873b4
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 32 deletions.
18 changes: 12 additions & 6 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS
SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})

if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
SET(TX_LINUX_SRCS tiered_storage.cc tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
SET(TX_LINUX_SRCS tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc
tiering/io_mgr.cc tiering/external_alloc.cc)

add_executable(dfly_bench dfly_bench.cc)
cxx_link(dfly_bench dfly_facade fibers2 absl::random_random)
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY)
Expand All @@ -36,22 +35,29 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc
)

SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessors.cc
SET(DF_SEARCH_SRCS search/search_family.cc search/doc_index.cc search/doc_accessors.cc
search/aggregator.cc)

if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
SET(DF_LINUX_SRCS tiered_storage.cc)

cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
endif()

add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
${SEARCH_FILES}
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
detail/save_stages_controller.cc
detail/snapshot_storage.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc cluster/incoming_slot_migration.cc
top_keys.cc multi_command_squasher.cc hll_family.cc
${DF_SEARCH_SRCS}
${DF_LINUX_SRCS}
cluster/cluster_config.cc cluster/cluster_family.cc cluster/incoming_slot_migration.cc
cluster/outgoing_slot_migration.cc cluster/cluster_defs.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)
Expand Down
3 changes: 0 additions & 3 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,6 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
shard_queue_.resize(sz);

size_t max_shard_file_size = GetTieredFileLimit(sz);
if (max_shard_file_size > 0)
is_tiering_enabled_ = true;

pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_queue_.size()) {
InitThreadLocal(pb, update_db_time, max_shard_file_size);
Expand Down
4 changes: 0 additions & 4 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,6 @@ class EngineShardSet {
return uint32_t(shard_queue_.size());
}

bool IsTieringEnabled() {
return is_tiering_enabled_;
}
util::ProactorPool* pool() {
return pp_;
}
Expand Down Expand Up @@ -347,7 +344,6 @@ class EngineShardSet {

util::ProactorPool* pp_;
std::vector<TaskQueue*> shard_queue_;
bool is_tiering_enabled_ = false;
};

template <typename U, typename P>
Expand Down
8 changes: 0 additions & 8 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -950,10 +950,6 @@ void ServerFamily::SnapshotScheduling() {
if (!cron_expr) {
return;
}
if (shard_set->IsTieringEnabled()) {
LOG(ERROR) << "Snapshot not allowed when using tiering. Exiting..";
exit(1);
}

const auto loading_check_interval = std::chrono::seconds(10);
while (service_.GetGlobalState() == GlobalState::LOADING) {
Expand Down Expand Up @@ -1399,10 +1395,6 @@ GenericError ServerFamily::DoSave(bool ignore_state) {

GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename,
Transaction* trans, bool ignore_state) {
if (shard_set->IsTieringEnabled()) {
return GenericError{make_error_code(errc::operation_not_permitted),
StrCat("Can not save database in tiering mode")};
}
auto state = service_.GetGlobalState();
// In some cases we want to create a snapshot even if server is not active, f.e in takeover
if (!ignore_state && (state != GlobalState::ACTIVE)) {
Expand Down
25 changes: 22 additions & 3 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "server/journal/journal.h"
#include "server/rdb_extensions.h"
#include "server/rdb_save.h"
#include "server/tiered_storage.h"

namespace dfly {

Expand Down Expand Up @@ -284,12 +285,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
expire_time = db_slice_->ExpireTime(eit);
}

io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
CHECK(res);
++type_freq_map_[*res];
if (pv.IsExternal()) {
// We can't block, so we just schedule a tiered read and append it to the delayed entries
util::fb2::Future<PrimeValue> future;
EngineShard::tlocal()->tiered_storage()->Read(
db_indx, pk.ToString(), pv,
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time});
++type_freq_map_[RDB_TYPE_STRING];
} else {
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
CHECK(res);
++type_freq_map_[*res];
}
}

bool SliceSnapshot::PushSerializedToChannel(bool force) {
// Bucket serialization might have accumulated some delayed values.
// Because we can finally block in this function, we'll await and serialize them
while (!delayed_entries_.empty()) {
auto& entry = delayed_entries_.back();
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid);
delayed_entries_.pop_back();
}

if (!force && serializer_->SerializedLen() < 4096)
return false;

Expand Down
10 changes: 10 additions & 0 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "server/db_slice.h"
#include "server/rdb_save.h"
#include "server/table.h"
#include "util/fibers/future.h"

namespace dfly {

Expand Down Expand Up @@ -132,6 +133,14 @@ class SliceSnapshot {
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

private:
// An entry whose value must be awaited
struct DelayedEntry {
DbIndex dbid;
CompactObj key;
util::fb2::Future<PrimeValue> value;
time_t expire;
};

DbSlice* db_slice_;
DbTableArray db_array_;

Expand All @@ -141,6 +150,7 @@ class SliceSnapshot {
DbIndex current_db_;

std::unique_ptr<RdbSerializer> serializer_;
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal

// Used for sanity checks.
bool serialize_bucket_running_ = false;
Expand Down
12 changes: 11 additions & 1 deletion src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
Expand Down Expand Up @@ -129,6 +130,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

// Load all values from bin by their hashes
void Defragment(tiering::DiskSegment segment, string_view value) {
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
Expand Down Expand Up @@ -166,6 +168,9 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (!modified && !cache_fetched_)
return false;

if (SliceSnapshot::IsSnaphotInProgress())
return false;

SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}
Expand All @@ -175,8 +180,9 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return true;

auto bin = ts_->bins_->Delete(segment);
if (bin.empty)
if (bin.empty) {
return true;
}

if (bin.fragmented) {
// Trigger read to signal need for defragmentation. ReportFetched will handle it.
Expand Down Expand Up @@ -286,6 +292,7 @@ void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
}
}

void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
tiering::DiskSegment segment = value->GetExternalSlice();
Expand Down Expand Up @@ -337,6 +344,9 @@ TieredStats TieredStorage::GetStats() const {
}

void TieredStorage::RunOffloading(DbIndex dbid) {
if (SliceSnapshot::IsSnaphotInProgress())
return;

PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
int stash_limit =
absl::GetFlag(FLAGS_tiered_storage_write_depth) - op_manager_->GetStats().pending_stash_cnt;
Expand Down
2 changes: 1 addition & 1 deletion src/server/tiering/external_alloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ void ExternalAllocator::Free(size_t offset, size_t sz) {

CHECK_LE(sz, block_size);
DCHECK_LT(block_id, blocks_num);
DCHECK(!page->free_blocks[block_id]);
DCHECK(!page->free_blocks[block_id]) << offset;

page->free_blocks.set(block_id);
++page->available;
Expand Down
12 changes: 7 additions & 5 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "io/io.h"
#include "server/tiering/common.h"
#include "server/tiering/disk_storage.h"

namespace dfly::tiering {

namespace {
Expand Down Expand Up @@ -59,12 +58,14 @@ void OpManager::Delete(EntryId id) {

void OpManager::Delete(DiskSegment segment) {
EntryOps* pending_op = nullptr;
if (auto it = pending_reads_.find(segment.ContainingPages().offset); it != pending_reads_.end())
pending_op = it->second.Find(segment);

auto base_it = pending_reads_.find(segment.ContainingPages().offset);
if (base_it != pending_reads_.end())
pending_op = base_it->second.Find(segment);

if (pending_op) {
pending_op->deleting = true;
} else if (ReportDelete(segment)) {
} else if (ReportDelete(segment) && base_it == pending_reads_.end()) {
storage_.MarkAsFree(segment.ContainingPages());
}
}
Expand Down Expand Up @@ -143,8 +144,9 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) {
deleting_full |= ReportDelete(ko.segment);
}

if (deleting_full)
if (deleting_full) {
storage_.MarkAsFree(info->segment);
}

pending_reads_.erase(offset);
}
Expand Down
7 changes: 6 additions & 1 deletion src/server/tiering/small_bins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ SmallBins::Stats SmallBins::GetStats() const {

SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) {
DCHECK_EQ(value.size(), kPageSize);
stats_.stashed_entries_cnt -= stashed_bins_.extract(segment.offset).mapped().entries;

auto bin = stashed_bins_.extract(segment.offset);
if (bin.empty())
return {};

stats_.stashed_entries_cnt -= bin.mapped().entries;

const char* data = value.data();

Expand Down
40 changes: 40 additions & 0 deletions tests/dragonfly/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,43 @@ async def test_bgsave_and_save(async_client: aioredis.Redis):
while await is_saving(async_client):
await asyncio.sleep(0.1)
await async_client.execute_command("SAVE")


@dfly_args(
{
**BASIC_ARGS,
"proactor_threads": 4,
"dbfilename": "tiered-entries",
"tiered_prefix": "/tmp/tiering_test_backing",
"tiered_offload_threshold": "0.0", # ask offloading loop to offload as much as possible
}
)
async def test_tiered_entries(async_client: aioredis.Redis):
"""This test makes sure tieried entries are correctly persisted"""

# With variance 4: 512 - 8192 we include small and large values
await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run(
async_client
)

# Compute the capture, this brings all items back to memory... so we'll wait for offloading
start_capture = await StaticSeeder.capture(async_client)

# Wait until the total_stashes counter stops increasing, meaning offloading finished
last_writes, current_writes = 0, -1
while last_writes != current_writes:
await asyncio.sleep(0.1)
last_writes = current_writes
current_writes = (await async_client.info("TIERED"))["tiered_total_stashes"]

# Save + flush + load
await async_client.execute_command("SAVE", "DF")
assert await async_client.flushall()
await async_client.execute_command(
"DEBUG",
"LOAD",
"tiered-entries-summary.dfs",
)

# Compare captures
assert await StaticSeeder.capture(async_client) == start_capture

0 comments on commit 6a873b4

Please sign in to comment.