From e783fc18fa2e95ec4e9d7e5f8289fe549e787861 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Fri, 20 Dec 2024 02:33:14 -0800 Subject: [PATCH] debug + Revert "Begin forward compatibility for WAL entry (#13225)" This reverts commit cf768a2f9e5451a99aa774d1ef2eb19681b8df6f. --- db/db_impl/db_impl.h | 28 ++- db/db_impl/db_impl_open.cc | 95 +++++++--- db/db_impl/db_impl_write.cc | 20 ++- db/db_wal_test.cc | 129 +++++++++++++- db/dbformat.h | 50 ++++++ db/log_format.h | 12 +- db/log_reader.cc | 162 ++++++++++++++---- db/log_reader.h | 49 ++++-- db/log_test.cc | 11 -- db/log_writer.cc | 157 +++++++++++------ db/log_writer.h | 23 ++- db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 4 + db_stress_tool/db_stress_test_base.cc | 1 + include/rocksdb/options.h | 19 ++ options/db_options.cc | 9 + options/db_options.h | 1 + options/options_helper.cc | 1 + options/options_settable_test.cc | 1 + options/options_test.cc | 6 + test_util/testutil.cc | 1 + tools/db_bench_tool.cc | 3 + tools/db_crashtest.py | 1 + .../new_features/track_and_verify_wals_api.md | 1 + 24 files changed, 626 insertions(+), 159 deletions(-) create mode 100644 unreleased_history/new_features/track_and_verify_wals_api.md diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6621b20f64a9..97de4541db43 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2071,15 +2071,18 @@ class DBImpl : public DB { bool read_only, int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed); + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info); void SetupLogFileProcessing(uint64_t wal_number); - Status InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader); + Status InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, + bool* const old_log_record, Status* const reporter_status, + DBOpenLogReporter* reporter, std::unique_ptr& reader); Status ProcessLogRecord( Slice record, const std::unique_ptr& reader, const UnorderedMap& running_ts_sz, uint64_t wal_number, @@ -2116,8 +2119,13 @@ class DBImpl : public DB { bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number, bool* corrupted_wal_found); - void FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status); + Status UpdatePredecessorWALInfo(uint64_t wal_number, + const SequenceNumber* next_sequence, + const std::string& fname, + PredecessorWALInfo& predecessor_wal_info); + + void FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence); // Return `Status::Corruption()` when `stop_replay_for_corruption == true` and // exits inconsistency between SST and WAL data @@ -2309,7 +2317,8 @@ class DBImpl : public DB { const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, @@ -2554,6 +2563,7 @@ class DBImpl : public DB { IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log); // Validate self-consistency of DB options diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d3c472640c6b..5adcb451665b 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -754,6 +754,11 @@ Status DBImpl::Recover( } } + if (!is_new_db && immutable_db_options_.track_and_verify_wals && + wal_files.empty()) { + return Status::Corruption("Opening an existing DB with no WAL files"); + } + if (immutable_db_options_.track_and_verify_wals_in_manifest) { if (!immutable_db_options_.best_efforts_recovery) { // Verify WALs in MANIFEST. @@ -1189,14 +1194,15 @@ Status DBImpl::ProcessLogFiles( bool stop_replay_for_corruption = false; bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; + PredecessorWALInfo predecessor_wal_info; for (auto wal_number : wal_numbers) { if (status.ok()) { - status = - ProcessLogFile(wal_number, min_wal_number, is_retry, read_only, - job_id, next_sequence, &stop_replay_for_corruption, - &stop_replay_by_wal_filter, &corrupted_wal_number, - corrupted_wal_found, version_edits, &flushed); + status = ProcessLogFile( + wal_number, min_wal_number, is_retry, read_only, job_id, + next_sequence, &stop_replay_for_corruption, + &stop_replay_by_wal_filter, &corrupted_wal_number, + corrupted_wal_found, version_edits, &flushed, predecessor_wal_info); } } @@ -1217,7 +1223,8 @@ Status DBImpl::ProcessLogFile( int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number, bool* corrupted_wal_found, - std::unordered_map* version_edits, bool* flushed) { + std::unordered_map* version_edits, bool* flushed, + PredecessorWALInfo& predecessor_wal_info) { assert(stop_replay_by_wal_filter); // Variable initialization starts @@ -1264,7 +1271,10 @@ Status DBImpl::ProcessLogFile( } Status init_status = InitializeLogReader( - wal_number, is_retry, fname, &old_log_record, &status, &reporter, reader); + wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number, + predecessor_wal_info, &old_log_record, &status, &reporter, reader); + + // FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases if (!init_status.ok()) { assert(status.ok()); status.PermitUncheckedError(); @@ -1272,6 +1282,8 @@ Status DBImpl::ProcessLogFile( } else if (reader == nullptr) { // TODO(hx235): remove this case since it's confusing assert(status.ok()); + // Fail initializing log reader for one log file with an ok status. + // Try next one. return status; } @@ -1311,13 +1323,19 @@ Status DBImpl::ProcessLogFile( "Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number, *next_sequence); + if (status.ok()) { + status = UpdatePredecessorWALInfo(wal_number, next_sequence, fname, + predecessor_wal_info); + } + if (!status.ok() || old_log_record) { status = HandleNonOkStatusOrOldLogRecord( wal_number, next_sequence, status, &old_log_record, stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found); } - FinishLogFileProcessing(next_sequence, status); + FinishLogFileProcessing(status, next_sequence); + return status; } @@ -1332,12 +1350,12 @@ void DBImpl::SetupLogFileProcessing(uint64_t wal_number) { static_cast(immutable_db_options_.wal_recovery_mode)); } -Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, - std::string& fname, - bool* const old_log_record, - Status* const reporter_status, - DBOpenLogReporter* reporter, - std::unique_ptr& reader) { +Status DBImpl::InitializeLogReader( + uint64_t wal_number, bool is_retry, std::string& fname, + bool stop_replay_for_corruption, uint64_t min_wal_number, + const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record, + Status* const reporter_status, DBOpenLogReporter* reporter, + std::unique_ptr& reader) { assert(old_log_record); assert(reporter_status); assert(reporter); @@ -1375,9 +1393,11 @@ Status DBImpl::InitializeLogReader(uint64_t wal_number, bool is_retry, // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - reader.reset(new log::Reader(immutable_db_options_.info_log, - std::move(file_reader), reporter, - true /*checksum*/, wal_number)); + reader.reset(new log::Reader( + immutable_db_options_.info_log, std::move(file_reader), reporter, + true /*checksum*/, wal_number, + immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption, + min_wal_number, predecessor_wal_info)); return status; } @@ -1630,8 +1650,26 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord( return status; } } -void DBImpl::FinishLogFileProcessing(SequenceNumber const* const next_sequence, - const Status& status) { + +Status DBImpl::UpdatePredecessorWALInfo( + uint64_t wal_number, const SequenceNumber* next_sequence, + const std::string& fname, PredecessorWALInfo& predecessor_wal_info) { + uint64_t bytes; + + Status s = env_->GetFileSize(fname, &bytes); + if (!s.ok()) { + return s; + } + + assert(next_sequence); + predecessor_wal_info = PredecessorWALInfo( + wal_number, bytes, + *next_sequence == kMaxSequenceNumber ? 0 : *next_sequence - 1); + return s; +} + +void DBImpl::FinishLogFileProcessing(const Status& status, + const SequenceNumber* next_sequence) { if (status.ok()) { assert(next_sequence); flush_scheduler_.Clear(); @@ -2193,6 +2231,7 @@ Status DB::OpenAndTrimHistory( IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, + const PredecessorWALInfo& predecessor_wal_info, log::Writer** new_log) { IOStatus io_s; std::unique_ptr lfile; @@ -2236,9 +2275,15 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options, *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush, - immutable_db_options_.wal_compression); + immutable_db_options_.wal_compression, + immutable_db_options_.track_and_verify_wals); io_s = (*new_log)->AddCompressionTypeRecord(write_options); + if (io_s.ok()) { + io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options, + predecessor_wal_info); + } } + return io_s; } @@ -2331,8 +2376,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, log::Writer* new_log = nullptr; const size_t preallocate_block_size = impl->GetWalPreallocateBlockSize(max_write_buffer_size); + // TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL + // created during DB open with predecessor WALs from previous DB session due + // to `avoid_flush_during_recovery == true`. This can protect the last WAL + // recovered. s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/, - preallocate_block_size, &new_log); + preallocate_block_size, + PredecessorWALInfo() /* predecessor_wal_info */, + &new_log); if (s.ok()) { // Prevent log files created by previous instance from being recycled. // They might be in alive_log_file_, and might get recycled otherwise. @@ -2367,7 +2418,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, recovered_seq); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(write_options, false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 9f627430d68a..39b03dc7e3ac 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1558,7 +1558,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, const WriteOptions& write_options, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + SequenceNumber sequence) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); @@ -1584,7 +1585,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (!io_s.ok()) { return io_s; } - io_s = log_writer->AddRecord(write_options, log_entry); + io_s = log_writer->AddRecord(write_options, log_entry, sequence); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); @@ -1634,7 +1635,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1760,7 +1761,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( write_options.rate_limiter_priority = write_group.leader->rate_limiter_priority; io_s = WriteToWAL(*merged_batch, write_options, log_writer, log_used, - &log_size, log_file_number_size); + &log_size, log_file_number_size, sequence); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -2443,10 +2444,19 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); if (creating_new_log) { + PredecessorWALInfo info; + log_write_mutex_.Lock(); + if (!logs_.empty()) { + log::Writer* cur_log_writer = logs_.back().writer; + info = PredecessorWALInfo(cur_log_writer->get_log_number(), + cur_log_writer->file()->GetFileSize(), + cur_log_writer->GetLastSeqnoRecorded()); + } + log_write_mutex_.Unlock(); // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. io_s = CreateWAL(write_options, new_log_number, recycle_log_number, - preallocate_block_size, &new_log); + preallocate_block_size, info, &new_log); if (s.ok()) { s = io_s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9ac09082a07c..3221a1d689e8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1838,9 +1838,125 @@ class RecoveryTestHelper { } }; -class DBWALTestWithParams : public DBWALTestBase, - public ::testing::WithParamInterface< - std::tuple> { +TEST_F(DBWALTest, TrackAndVerifyWALsRecycleWAL) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.recycle_log_file_num = 1; + options.track_and_verify_wals = true; + + DestroyAndReopen(options); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + FlushOptions fo; + fo.wait = true; + ASSERT_OK(dbfull()->Flush(fo)); + + ASSERT_OK(Put("key_ignore", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore1", "wal_to_recycle")); + ASSERT_OK(Put("key_ignore2", "wal_to_recycle")); + ASSERT_OK(dbfull()->Flush(fo)); + + // Stop background flush to avoid deleting any WAL + env_->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + // Recycle the first WAL + ASSERT_OK(Put("key1", "old_value")); + + // Create WAL hole + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_GE(log_files.size(), 1); + ASSERT_OK(test::TruncateFile( + options.env, LogFileName(dbname_, log_files.back()->LogNumber()), + 0 /* new_length */)); + + // Recycle the second WAL + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + Status s = TryReopen(options); + + ASSERT_OK(s); + + ASSERT_EQ("wal_to_recycle", Get("key_ignore2")); + ASSERT_EQ("NOT_FOUND", Get("key1")); + + Close(); +} + +class DBWALTrackAndVerifyWALsWithParamsTest + : public DBWALTestBase, + public ::testing::WithParamInterface { + public: + DBWALTrackAndVerifyWALsWithParamsTest() + : DBWALTestBase("/db_wal_track_and_verify_wals_with_params_test") {} +}; + +INSTANTIATE_TEST_CASE_P( + DBWALTrackAndVerifyWALsWithParamsTest, + DBWALTrackAndVerifyWALsWithParamsTest, + ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, + WALRecoveryMode::kAbsoluteConsistency, + WALRecoveryMode::kPointInTimeRecovery, + WALRecoveryMode::kSkipAnyCorruptedRecords)); + +TEST_P(DBWALTrackAndVerifyWALsWithParamsTest, Basic) { + Options options = CurrentOptions(); + options.avoid_flush_during_shutdown = true; + options.track_and_verify_wals = true; + options.wal_recovery_mode = GetParam(); + + DestroyAndReopen(options); + + // Stop background flush to avoid deleting any WAL + env_->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_task; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + ASSERT_OK(Put("key1", "old_value")); + + // Create WAL hole + VectorWalPtr log_files; + ASSERT_OK(db_->GetSortedWalFiles(log_files)); + ASSERT_EQ(log_files.size(), 1); + ASSERT_OK(test::TruncateFile( + options.env, LogFileName(dbname_, log_files.back()->LogNumber()), + 0 /* new_length */)); + + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + ASSERT_OK(Put("key1", "new_value")); + + Status s = TryReopen(options); + if (options.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { + ASSERT_OK(s); + ASSERT_EQ("NOT_FOUND", Get("key1")); + } else if (options.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency || + options.wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find( + "Mismatched last sequence number recorded in the WAL") != + std::string::npos); + } else { + ASSERT_OK(s); + ASSERT_EQ("new_value", Get("key1")); + } + + Close(); +} + +class DBWALTestWithParams + : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple> { public: DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} }; @@ -1853,7 +1969,8 @@ INSTANTIATE_TEST_CASE_P( RecoveryTestHelper::kWALFilesCount, 1), ::testing::Values(CompressionType::kNoCompression, - CompressionType::kZSTD))); + CompressionType::kZSTD), + ::testing::Bool())); class DBWALTestWithParamsVaryingRecoveryMode : public DBWALTestBase, @@ -1891,6 +2008,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); // test checksum failure or parsing RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, @@ -1914,6 +2032,7 @@ TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { // Verify clean slate behavior Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); const size_t row_count = RecoveryTestHelper::FillData(this, &options); options.create_if_missing = false; ASSERT_OK(TryReopen(options)); @@ -2164,6 +2283,7 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); @@ -2221,6 +2341,7 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { // Fill data for testing Options options = CurrentOptions(); + options.track_and_verify_wals = std::get<4>(GetParam()); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); diff --git a/db/dbformat.h b/db/dbformat.h index f3a9b9a1a523..02aa632d106d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -1179,4 +1179,54 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; +class PredecessorWALInfo { + public: + PredecessorWALInfo() + : log_number_(0), + size_bytes_(0), + last_seqno_recorded_(0), + initialized_(false) {} + + explicit PredecessorWALInfo(uint64_t log_number, uint64_t size_bytes, + SequenceNumber last_seqno_recorded) + : log_number_(log_number), + size_bytes_(size_bytes), + last_seqno_recorded_(last_seqno_recorded), + initialized_(true) {} + + uint64_t GetLogNumber() const { return log_number_; } + + uint64_t GetSizeBytes() const { return size_bytes_; } + + SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; } + + bool IsInitialized() const { return initialized_; } + + inline void EncodeTo(std::string* dst) const { + assert(dst != nullptr); + PutFixed64(dst, log_number_); + PutFixed64(dst, size_bytes_); + PutFixed64(dst, last_seqno_recorded_); + } + + inline Status DecodeFrom(Slice* src) { + if (!GetFixed64(src, &log_number_)) { + return Status::Corruption("Error decoding log number"); + } + if (!GetFixed64(src, &size_bytes_)) { + return Status::Corruption("Error decoding size bytes"); + } + if (!GetFixed64(src, &last_seqno_recorded_)) { + return Status::Corruption("Error decoding last seqno recorded"); + } + initialized_ = true; + return Status::OK(); + } + + private: + uint64_t log_number_; + uint64_t size_bytes_; + SequenceNumber last_seqno_recorded_; + bool initialized_; +}; } // namespace ROCKSDB_NAMESPACE diff --git a/db/log_format.h b/db/log_format.h index 9b691eeb5d7f..65797d338a41 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -12,14 +12,12 @@ #pragma once -#include - #include "rocksdb/rocksdb_namespace.h" namespace ROCKSDB_NAMESPACE { namespace log { -enum RecordType : uint8_t { +enum RecordType { // Zero is reserved for preallocated files kZeroType = 0, kFullType = 1, @@ -41,10 +39,12 @@ enum RecordType : uint8_t { // User-defined timestamp sizes kUserDefinedTimestampSizeType = 10, kRecyclableUserDefinedTimestampSizeType = 11, + + // For WAL verification + kPredecessorWALInfoType = 12, + kRecyclePredecessorWALInfoType = 13, }; -// Unknown type of value with the 8-th bit set will be ignored -constexpr uint8_t kRecordTypeSafeIgnoreMask = 1 << 7; -constexpr uint8_t kMaxRecordType = kRecyclableUserDefinedTimestampSizeType; +constexpr int kMaxRecordType = kRecyclePredecessorWALInfoType; constexpr unsigned int kBlockSize = 32768; diff --git a/db/log_reader.cc b/db/log_reader.cc index cae4fd7739d4..4d39a09e6271 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,7 +24,10 @@ Reader::Reporter::~Reporter() = default; Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) + Reporter* reporter, bool checksum, uint64_t log_num, + bool track_and_verify_wals, bool stop_replay_for_corruption, + uint64_t min_wal_number_to_keep, + const PredecessorWALInfo& predecessor_wal_info) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -37,6 +40,10 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), + track_and_verify_wals_(track_and_verify_wals), + stop_replay_for_corruption_(stop_replay_for_corruption), + min_wal_number_to_keep_(min_wal_number_to_keep), + predecessor_wal_info_(predecessor_wal_info), recycled_(false), first_record_read_(false), compression_type_(kNoCompression), @@ -65,6 +72,9 @@ Reader::~Reader() { // // TODO krad: Evaluate if we need to move to a more strict mode where we // restrict the inconsistency to only the last log +// TODO (hx235): move `wal_recovery_mode` to be a member data like other +// information (e.g, `stop_replay_for_corruption`) to decide whether to +// check for and surface corruption in `ReadRecord()` bool Reader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode wal_recovery_mode, uint64_t* record_checksum) { @@ -88,7 +98,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); size_t drop_size = 0; - const uint8_t record_type = + const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size, record_checksum); switch (record_type) { case kFullType: @@ -185,6 +195,23 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + + PredecessorWALInfo expected_predecessor_wal_info; + Status s = expected_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + expected_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record && !scratch->empty()) { @@ -313,13 +340,11 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, break; default: { - if ((record_type & kRecordTypeSafeIgnoreMask) == 0) { - std::string reason = - "unknown record type " + std::to_string(record_type); - ReportCorruption( - (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), - reason.c_str()); - } + std::string reason = + "unknown record type " + std::to_string(record_type); + ReportCorruption( + (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), + reason.c_str()); in_fragmented_record = false; scratch->clear(); break; @@ -329,6 +354,54 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, return false; } +void Reader::MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& expected_predecessor_wal_info) { + if (!track_and_verify_wals_ || + wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords || + stop_replay_for_corruption_) { + return; + } + uint64_t expected_predecessor_log_number = + expected_predecessor_wal_info.GetLogNumber(); + + // This is the first WAL recovered thus with no predecessor WAL info has been + // initialized + if (!predecessor_wal_info_.IsInitialized()) { + if (expected_predecessor_log_number >= min_wal_number_to_keep_) { + std::string reason = "Missing WAL of log number " + + std::to_string(expected_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } + } else { + if (predecessor_wal_info_.GetLogNumber() != + expected_predecessor_log_number) { + std::string reason = "Missing WAL of log number " + + std::to_string(expected_predecessor_log_number); + ReportCorruption(fragment.size(), reason.c_str()); + } else if (predecessor_wal_info_.GetLastSeqnoRecorded() != + expected_predecessor_wal_info.GetLastSeqnoRecorded()) { + std::string reason = + "Mismatched last sequence number recorded in the WAL of log number " + + std::to_string(expected_predecessor_log_number) + ". Expected " + + std::to_string(expected_predecessor_wal_info.GetLastSeqnoRecorded()) + + ". Actual " + + std::to_string(predecessor_wal_info_.GetLastSeqnoRecorded()) + + ". (Last sequence number equal to 0 indicates no WAL records)"; + ReportCorruption(fragment.size(), reason.c_str()); + } else if (predecessor_wal_info_.GetSizeBytes() != + expected_predecessor_wal_info.GetSizeBytes()) { + std::string reason = + "Mismatched size of the WAL of log number " + + std::to_string(expected_predecessor_log_number) + ". Expected " + + std::to_string(expected_predecessor_wal_info.GetSizeBytes()) + + " bytes. Actual " + + std::to_string(predecessor_wal_info_.GetSizeBytes()) + " bytes."; + ReportCorruption(fragment.size(), reason.c_str()); + } + } +} + uint64_t Reader::LastRecordOffset() { return last_record_offset_; } uint64_t Reader::LastRecordEnd() { @@ -421,7 +494,7 @@ void Reader::ReportOldLogRecord(size_t bytes) { } } -bool Reader::ReadMore(size_t* drop_size, uint8_t* error) { +bool Reader::ReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); @@ -462,15 +535,15 @@ bool Reader::ReadMore(size_t* drop_size, uint8_t* error) { } } -uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, - uint64_t* fragment_checksum) { +unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, + uint64_t* fragment_checksum) { while (true) { // We need at least the minimum header size if (buffer_.size() < static_cast(kHeaderSize)) { // the default value of r is meaningless because ReadMore will overwrite // it if it returns false; in case it returns true, the return value will // not be used anyway - uint8_t r = kEof; + int r = kEof; if (!ReadMore(drop_size, &r)) { return r; } @@ -481,12 +554,13 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, const char* header = buffer_.data(); const uint32_t a = static_cast(header[4]) & 0xff; const uint32_t b = static_cast(header[5]) & 0xff; - const uint8_t type = static_cast(header[6]); + const unsigned int type = header[6]; const uint32_t length = a | (b << 8); int header_size = kHeaderSize; const bool is_recyclable_type = ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType); + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType); if (is_recyclable_type) { header_size = kRecyclableHeaderSize; if (first_record_read_ && !recycled_) { @@ -496,7 +570,7 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, recycled_ = true; // We need enough for the larger header if (buffer_.size() < static_cast(kRecyclableHeaderSize)) { - uint8_t r = kEof; + int r = kEof; if (!ReadMore(drop_size, &r)) { return r; } @@ -551,6 +625,8 @@ uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *result = Slice(header + header_size, length); @@ -640,7 +716,9 @@ Status Reader::UpdateRecordedTimestampSize( } bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, - WALRecoveryMode /*unused*/, + WALRecoveryMode wal_recovery_mode + + , uint64_t* /* checksum */) { assert(record != nullptr); assert(scratch != nullptr); @@ -653,7 +731,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, uint64_t prospective_record_offset = 0; uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); size_t drop_size = 0; - uint8_t fragment_type_or_err = 0; // Initialize to make compiler happy + unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy Slice fragment; while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { switch (fragment_type_or_err) { @@ -730,7 +808,24 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, } break; } + case kPredecessorWALInfoType: + case kRecyclePredecessorWALInfoType: { + fragments_.clear(); + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + PredecessorWALInfo expected_predecessor_wal_info; + Status s = expected_predecessor_wal_info.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode PredecessorWALInfoType record"); + } else { + MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, + expected_predecessor_wal_info); + } + break; + } case kUserDefinedTimestampSizeType: case kRecyclableUserDefinedTimestampSizeType: { if (in_fragmented_record_ && !scratch->empty()) { @@ -783,13 +878,11 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, break; default: { - if ((fragment_type_or_err & kRecordTypeSafeIgnoreMask) == 0) { - std::string reason = - "unknown record type " + std::to_string(fragment_type_or_err); - ReportCorruption( - fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), - reason.c_str()); - } + std::string reason = + "unknown record type " + std::to_string(fragment_type_or_err); + ReportCorruption( + fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), + reason.c_str()); in_fragmented_record_ = false; fragments_.clear(); break; @@ -807,7 +900,7 @@ void FragmentBufferedReader::UnmarkEOF() { UnmarkEOFInternal(); } -bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) { +bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); @@ -848,15 +941,15 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) { } // return true if the caller should process the fragment_type_or_err. -bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, - uint8_t* fragment_type_or_err) { +bool FragmentBufferedReader::TryReadFragment( + Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) { assert(fragment != nullptr); assert(drop_size != nullptr); assert(fragment_type_or_err != nullptr); while (buffer_.size() < static_cast(kHeaderSize)) { size_t old_size = buffer_.size(); - uint8_t error = kEof; + int error = kEof; if (!TryReadMore(drop_size, &error)) { *fragment_type_or_err = error; return false; @@ -867,11 +960,12 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, const char* header = buffer_.data(); const uint32_t a = static_cast(header[4]) & 0xff; const uint32_t b = static_cast(header[5]) & 0xff; - const uint8_t type = static_cast(header[6]); + const unsigned int type = header[6]; const uint32_t length = a | (b << 8); int header_size = kHeaderSize; if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || - type == kRecyclableUserDefinedTimestampSizeType) { + type == kRecyclableUserDefinedTimestampSizeType || + type == kRecyclePredecessorWALInfoType) { if (first_record_read_ && !recycled_) { // A recycled log should have started with a recycled record *fragment_type_or_err = kBadRecord; @@ -881,7 +975,7 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, header_size = kRecyclableHeaderSize; while (buffer_.size() < static_cast(kRecyclableHeaderSize)) { size_t old_size = buffer_.size(); - uint8_t error = kEof; + int error = kEof; if (!TryReadMore(drop_size, &error)) { *fragment_type_or_err = error; return false; @@ -898,7 +992,7 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, while (header_size + length > buffer_.size()) { size_t old_size = buffer_.size(); - uint8_t error = kEof; + int error = kEof; if (!TryReadMore(drop_size, &error)) { *fragment_type_or_err = error; return false; @@ -927,6 +1021,8 @@ bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, buffer_.remove_prefix(header_size + length); if (!uncompress_ || type == kSetCompressionType || + type == kPredecessorWALInfoType || + type == kRecyclePredecessorWALInfoType || type == kUserDefinedTimestampSizeType || type == kRecyclableUserDefinedTimestampSizeType) { *fragment = Slice(header + header_size, length); diff --git a/db/log_reader.h b/db/log_reader.h index a39f5b9cbb31..b7b5be233da3 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -10,7 +10,6 @@ #pragma once #include -#include #include #include #include @@ -58,9 +57,14 @@ class Reader { // live while this Reader is in use. // // If "checksum" is true, verify checksums if available. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters Reader(std::shared_ptr info_log, std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num); + bool checksum, uint64_t log_num, bool track_and_verify_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& predecessor_wal_info = PredecessorWALInfo()); // No copying allowed Reader(const Reader&) = delete; void operator=(const Reader&) = delete; @@ -148,6 +152,17 @@ class Reader { // which log number this is uint64_t const log_number_; + // See `Optinos::track_and_verify_wals` + bool track_and_verify_wals_; + // Below variables are used for WAL verification + // TODO(hx235): to revise `stop_replay_for_corruption_` in `LogReader` since + // we have `predecessor_wal_info_` to verify against the `PredecessorWALInfo` + // recorded in current WAL. If there is no WAL hole, we can revise + // `stop_replay_for_corruption_` to be false. + bool stop_replay_for_corruption_; + uint64_t min_wal_number_to_keep_; + PredecessorWALInfo predecessor_wal_info_; + // Whether this is a recycled log file bool recycled_; @@ -172,7 +187,7 @@ class Reader { UnorderedMap recorded_cf_to_ts_sz_; // Extend record types with the following special values - enum : uint8_t { + enum { kEof = kMaxRecordType + 1, // Returned whenever we find an invalid physical record. // Currently there are three situations in which this happens: @@ -193,11 +208,11 @@ class Reader { // If WAL compressioned is enabled, fragment_checksum is the checksum of the // fragment computed from the orginal buffer containinng uncompressed // fragment. - uint8_t ReadPhysicalRecord(Slice* result, size_t* drop_size, - uint64_t* fragment_checksum = nullptr); + unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size, + uint64_t* fragment_checksum = nullptr); // Read some more - bool ReadMore(size_t* drop_size, uint8_t* error); + bool ReadMore(size_t* drop_size, int* error); void UnmarkEOFInternal(); @@ -211,14 +226,24 @@ class Reader { Status UpdateRecordedTimestampSize( const std::vector>& cf_to_ts_sz); + + void MaybeVerifyPredecessorWALInfo( + WALRecoveryMode wal_recovery_mode, Slice fragment, + const PredecessorWALInfo& expected_predecessor_wal_info); }; class FragmentBufferedReader : public Reader { public: - FragmentBufferedReader(std::shared_ptr info_log, - std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num) - : Reader(info_log, std::move(_file), reporter, checksum, log_num), + FragmentBufferedReader( + std::shared_ptr info_log, + std::unique_ptr&& _file, Reporter* reporter, + bool checksum, uint64_t log_num, bool verify_and_track_wals = false, + bool stop_replay_for_corruption = false, + uint64_t min_wal_number_to_keep = std::numeric_limits::max(), + const PredecessorWALInfo& predecessor_wal_info = PredecessorWALInfo()) + : Reader(info_log, std::move(_file), reporter, checksum, log_num, + verify_and_track_wals, stop_replay_for_corruption, + min_wal_number_to_keep, predecessor_wal_info), fragments_(), in_fragmented_record_(false) {} ~FragmentBufferedReader() override {} @@ -233,9 +258,9 @@ class FragmentBufferedReader : public Reader { bool in_fragmented_record_; bool TryReadFragment(Slice* result, size_t* drop_size, - uint8_t* fragment_type_or_err); + unsigned int* fragment_type_or_err); - bool TryReadMore(size_t* drop_size, uint8_t* error); + bool TryReadMore(size_t* drop_size, int* error); // No copy allowed FragmentBufferedReader(const FragmentBufferedReader&); diff --git a/db/log_test.cc b/db/log_test.cc index 1264c2a594ab..51f88ac5b7d1 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -436,17 +436,6 @@ TEST_P(LogTest, BadRecordType) { ASSERT_EQ("OK", MatchError("unknown record type")); } -TEST_P(LogTest, IgnorableRecordType) { - Write("foo"); - // Type is stored in header[6] - SetByte(6, static_cast(kRecordTypeSafeIgnoreMask + 100)); - FixChecksum(0, 3, false); - ASSERT_EQ("EOF", Read()); - // The new type has value 129 and masked to be ignorable if unknown - ASSERT_EQ(0U, DroppedBytes()); - ASSERT_EQ("", ReportMessage()); -} - TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte diff --git a/db/log_writer.cc b/db/log_writer.cc index f178d6281b50..752bf42d6d8f 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -22,7 +22,7 @@ namespace ROCKSDB_NAMESPACE::log { Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush, - CompressionType compression_type) + CompressionType compression_type, bool track_and_verify_wals) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), @@ -31,8 +31,10 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize), manual_flush_(manual_flush), compression_type_(compression_type), - compress_(nullptr) { - for (uint8_t i = 0; i <= kMaxRecordType; i++) { + compress_(nullptr), + track_and_verify_wals_(track_and_verify_wals), + last_seqno_recorded_(0) { + for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } @@ -52,19 +54,12 @@ Writer::~Writer() { } IOStatus Writer::WriteBuffer(const WriteOptions& write_options) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); + s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (!s.ok()) { return s; } @@ -92,17 +87,10 @@ bool Writer::PublishIfClosed() { } IOStatus Writer::AddRecord(const WriteOptions& write_options, - const Slice& slice) { - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + const Slice& slice, const SequenceNumber& seqno) { + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } const char* ptr = slice.data(); size_t left = slice.size(); @@ -118,7 +106,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, compress_start = true; } - IOStatus s; IOOptions opts; s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (s.ok()) { @@ -196,6 +183,10 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, } } + if (s.ok()) { + last_seqno_recorded_ = std::max(last_seqno_recorded_, seqno); + } + return s; } @@ -208,23 +199,16 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return IOStatus::OK(); } - if (dest_->seen_error()) { -#ifndef NDEBUG - if (dest_->seen_injected_error()) { - std::stringstream msg; - msg << "Seen " << FaultInjectionTestFS::kInjected - << " error. Skip writing buffer."; - return IOStatus::IOError(msg.str()); - } -#endif // NDEBUG - return IOStatus::IOError("Seen error. Skip writing buffer."); + IOStatus s = MaybeHandleSeenFileWriterError(); + if (!s.ok()) { + return s; } CompressionTypeRecord record(compression_type_); std::string encode; record.EncodeTo(&encode); - IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType, - encode.data(), encode.size()); + s = EmitPhysicalRecord(write_options, kSetCompressionType, encode.data(), + encode.size()); if (s.ok()) { if (!manual_flush_) { IOOptions io_opts; @@ -251,6 +235,44 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { return s; } +IOStatus Writer::MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info) { + IOStatus s = MaybeHandleSeenFileWriterError(); + + if (!s.ok()) { + return s; + } + + if (!track_and_verify_wals_ || !info.IsInitialized()) { + return IOStatus::OK(); + } + + std::string encode; + info.EncodeTo(&encode); + + s = MaybeSwitchToNewBlock(write_options, encode); + if (!s.ok()) { + return s; + } + + RecordType type = recycle_log_files_ ? kRecyclePredecessorWALInfoType + : kPredecessorWALInfoType; + s = EmitPhysicalRecord(write_options, type, encode.data(), encode.size()); + + if (!s.ok()) { + return s; + } + + if (!manual_flush_) { + IOOptions io_opts; + s = WritableFileWriter::PrepareIOOptions(write_options, io_opts); + if (s.ok()) { + s = dest_->Flush(io_opts); + } + } + return s; +} + IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( const WriteOptions& write_options, const UnorderedMap& cf_to_ts_sz) { @@ -275,22 +297,9 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType : kUserDefinedTimestampSizeType; - // If there's not enough space for this record, switch to a new block. - const int64_t leftover = kBlockSize - block_offset_; - if (leftover < header_size_ + (int)encoded.size()) { - IOOptions opts; - IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); - if (!s.ok()) { - return s; - } - - std::vector trailer(leftover, '\x00'); - s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); - if (!s.ok()) { - return s; - } - - block_offset_ = 0; + IOStatus s = MaybeSwitchToNewBlock(write_options, encoded); + if (!s.ok()) { + return s; } return EmitPhysicalRecord(write_options, type, encoded.data(), @@ -313,7 +322,7 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, uint32_t crc = type_crc_[t]; if (t < kRecyclableFullType || t == kSetCompressionType || - t == kUserDefinedTimestampSizeType) { + t == kPredecessorWALInfoType || t == kUserDefinedTimestampSizeType) { // Legacy record format assert(block_offset_ + kHeaderSize + n <= kBlockSize); header_size = kHeaderSize; @@ -352,4 +361,42 @@ IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, return s; } +IOStatus Writer::MaybeHandleSeenFileWriterError() { + if (dest_->seen_error()) { +#ifndef NDEBUG + if (dest_->seen_injected_error()) { + std::stringstream msg; + msg << "Seen " << FaultInjectionTestFS::kInjected + << " error. Skip writing buffer."; + return IOStatus::IOError(msg.str()); + } +#endif // NDEBUG + return IOStatus::IOError("Seen error. Skip writing buffer."); + } + return IOStatus::OK(); +} + +IOStatus Writer::MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write) { + IOStatus s; + const int64_t leftover = kBlockSize - block_offset_; + // If there's not enough space for this record, switch to a new block. + if (leftover < header_size_ + (int)content_to_write.size()) { + IOOptions opts; + s = WritableFileWriter::PrepareIOOptions(write_options, opts); + if (!s.ok()) { + return s; + } + + std::vector trailer(leftover, '\x00'); + s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); + if (!s.ok()) { + return s; + } + + block_offset_ = 0; + } + return s; +} + } // namespace ROCKSDB_NAMESPACE::log diff --git a/db/log_writer.h b/db/log_writer.h index 7cae52dd51c4..f7aef75197d5 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include #include +#include "db/dbformat.h" #include "db/log_format.h" #include "rocksdb/compression_type.h" #include "rocksdb/env.h" @@ -76,18 +77,24 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. + // TODO(hx235): seperate WAL related parameters from general `Reader` + // parameters explicit Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush = false, - CompressionType compressionType = kNoCompression); + CompressionType compressionType = kNoCompression, + bool track_and_verify_wals = false); // No copying allowed Writer(const Writer&) = delete; void operator=(const Writer&) = delete; ~Writer(); - IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice); + IOStatus AddRecord(const WriteOptions& write_options, const Slice& slice, + const SequenceNumber& seqno = 0); IOStatus AddCompressionTypeRecord(const WriteOptions& write_options); + IOStatus MaybeAddPredecessorWALInfo(const WriteOptions& write_options, + const PredecessorWALInfo& info); // If there are column families in `cf_to_ts_sz` not included in // `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero, @@ -116,6 +123,8 @@ class Writer { size_t TEST_block_offset() const { return block_offset_; } + SequenceNumber GetLastSeqnoRecorded() const { return last_seqno_recorded_; }; + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block @@ -131,6 +140,11 @@ class Writer { IOStatus EmitPhysicalRecord(const WriteOptions& write_options, RecordType type, const char* ptr, size_t length); + IOStatus MaybeHandleSeenFileWriterError(); + + IOStatus MaybeSwitchToNewBlock(const WriteOptions& write_options, + const std::string& content_to_write); + // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() bool manual_flush_; @@ -145,6 +159,11 @@ class Writer { // Since the user-defined timestamp size cannot be changed while the DB is // running, existing entry in this map cannot be updated. UnorderedMap recorded_cf_to_ts_sz_; + + // See `Options::track_and_verify_wals` + bool track_and_verify_wals_; + + SequenceNumber last_seqno_recorded_; }; } // namespace log diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 62c79ac0e5e8..0c1ef4e11369 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -421,6 +421,7 @@ DECLARE_int32(test_ingest_standalone_range_deletion_one_in); DECLARE_bool(allow_unprepared_value); DECLARE_string(file_temperature_age_thresholds); DECLARE_uint32(commit_bypass_memtable_one_in); +DECLARE_bool(track_and_verify_wals); constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index c49846b9f194..1859e6940fb9 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -847,6 +847,10 @@ DEFINE_bool(allow_unprepared_value, ROCKSDB_NAMESPACE::ReadOptions().allow_unprepared_value, "Allow lazy loading of values for range scans"); +DEFINE_bool(track_and_verify_wals, + ROCKSDB_NAMESPACE::Options().track_and_verify_wals, + "See Options::track_and_verify_wals"); + static bool ValidateInt32Percent(const char* flagname, int32_t value) { if (value < 0 || value > 100) { fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", flagname, diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3f7b8e646216..a79aecda30ed 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -4117,6 +4117,7 @@ void InitializeOptionsFromFlags( options.level_compaction_dynamic_level_bytes = FLAGS_level_compaction_dynamic_level_bytes; options.track_and_verify_wals_in_manifest = true; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; options.verify_sst_unique_id_in_manifest = FLAGS_verify_sst_unique_id_in_manifest; options.memtable_protection_bytes_per_key = diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b27f53b4a849..0a8ed2fe021a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -632,6 +632,25 @@ struct DBOptions { // Default: false bool track_and_verify_wals_in_manifest = false; + // EXPERIMENTAL + // + // If true, various information about predecessor WAL will be recorded in the + // current WAL for verification on the predecessor WAL during WAL recovery. + // + // It verifies the following: + // 1. There exists at least some WAL in the DB + // - It's not compatible with `RepairDB()` since this option imposes a + // stricter requirement on WAL than the DB went through `RepariDB()` can + // normally meet + // 2. There exists no WAL hole where new WAL data presents while some old WAL + // data is missing + // + // This is intended to be a better replacement to + // `track_and_verify_wals_in_manifest`. + // + // Default: false + bool track_and_verify_wals = false; + // If true, verifies the SST unique id between MANIFEST and actual file // each time an SST file is opened. This check ensures an SST file is not // overwritten or misplaced. A corruption error will be reported if mismatch diff --git a/options/db_options.cc b/options/db_options.cc index 967bb9b964a6..29e7632473a0 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -230,6 +230,10 @@ static std::unordered_map track_and_verify_wals_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"track_and_verify_wals", + {offsetof(struct ImmutableDBOptions, track_and_verify_wals), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"verify_sst_unique_id_in_manifest", {offsetof(struct ImmutableDBOptions, verify_sst_unique_id_in_manifest), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -716,6 +720,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) compaction_verify_record_count(options.compaction_verify_record_count), track_and_verify_wals_in_manifest( options.track_and_verify_wals_in_manifest), + track_and_verify_wals(options.track_and_verify_wals), verify_sst_unique_id_in_manifest( options.verify_sst_unique_id_in_manifest), env(options.env), @@ -820,6 +825,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { " " "Options.track_and_verify_wals_in_manifest: %d", track_and_verify_wals_in_manifest); + ROCKS_LOG_HEADER(log, + " " + "Options.track_and_verify_wals: %d", + track_and_verify_wals); ROCKS_LOG_HEADER(log, " Options.verify_sst_unique_id_in_manifest: %d", verify_sst_unique_id_in_manifest); ROCKS_LOG_HEADER(log, " Options.env: %p", diff --git a/options/db_options.h b/options/db_options.h index ac76ea40d8eb..25318ec1a616 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -27,6 +27,7 @@ struct ImmutableDBOptions { bool flush_verify_memtable_count; bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; + bool track_and_verify_wals; bool verify_sst_unique_id_in_manifest; Env* env; std::shared_ptr rate_limiter; diff --git a/options/options_helper.cc b/options/options_helper.cc index f05d90f7c1c6..0e9b0199a443 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -71,6 +71,7 @@ void BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.compaction_verify_record_count; options.track_and_verify_wals_in_manifest = immutable_db_options.track_and_verify_wals_in_manifest; + options.track_and_verify_wals = immutable_db_options.track_and_verify_wals; options.verify_sst_unique_id_in_manifest = immutable_db_options.verify_sst_unique_id_in_manifest; options.env = immutable_db_options.env; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 6036d0513122..d5d92838308e 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -406,6 +406,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "flush_verify_memtable_count=true;" "compaction_verify_record_count=true;" "track_and_verify_wals_in_manifest=true;" + "track_and_verify_wals=true;" "verify_sst_unique_id_in_manifest=true;" "is_fd_close_on_exec=false;" "bytes_per_sync=4295013613;" diff --git a/options/options_test.cc b/options/options_test.cc index 2f9b12d3a1d8..ef5e62c3fed3 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -146,6 +146,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -329,6 +330,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); @@ -901,6 +903,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"daily_offpeak_time_utc", "06:30-23:30"}, @@ -916,6 +919,7 @@ TEST_F(OptionsTest, OldInterfaceTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.verify_sst_unique_id_in_manifest, true); ASSERT_EQ(new_db_opt.max_open_files, 32); db_options_map["unknown_option"] = "1"; @@ -2450,6 +2454,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"error_if_exists", "false"}, {"paranoid_checks", "true"}, {"track_and_verify_wals_in_manifest", "true"}, + {"track_and_verify_wals", "true"}, {"verify_sst_unique_id_in_manifest", "true"}, {"max_open_files", "32"}, {"max_total_wal_size", "33"}, @@ -2638,6 +2643,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.error_if_exists, false); ASSERT_EQ(new_db_opt.paranoid_checks, true); ASSERT_EQ(new_db_opt.track_and_verify_wals_in_manifest, true); + ASSERT_EQ(new_db_opt.track_and_verify_wals, true); ASSERT_EQ(new_db_opt.max_open_files, 32); ASSERT_EQ(new_db_opt.max_total_wal_size, static_cast(33)); ASSERT_EQ(new_db_opt.use_fsync, true); diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 49ab2ebfd7d5..35884a7b3789 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -308,6 +308,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { db_opt->is_fd_close_on_exec = rnd->Uniform(2); db_opt->paranoid_checks = rnd->Uniform(2); db_opt->track_and_verify_wals_in_manifest = rnd->Uniform(2); + db_opt->track_and_verify_wals = rnd->Uniform(2); db_opt->verify_sst_unique_id_in_manifest = rnd->Uniform(2); db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); db_opt->skip_checking_sst_file_sizes_on_db_open = rnd->Uniform(2); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index f643552608f6..66aacd230988 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1808,6 +1808,8 @@ DEFINE_bool(build_info, false, DEFINE_bool(track_and_verify_wals_in_manifest, false, "If true, enable WAL tracking in the MANIFEST"); +DEFINE_bool(track_and_verify_wals, false, "See Options.track_and_verify_wals"); + namespace ROCKSDB_NAMESPACE { namespace { static Status CreateMemTableRepFactory( @@ -4721,6 +4723,7 @@ class Benchmark { options.allow_data_in_errors = FLAGS_allow_data_in_errors; options.track_and_verify_wals_in_manifest = FLAGS_track_and_verify_wals_in_manifest; + options.track_and_verify_wals = FLAGS_track_and_verify_wals; // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index f9943a2c7649..59912dbe9daa 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -343,6 +343,7 @@ "universal_max_read_amp": lambda: random.choice([-1] * 3 + [0, 4, 10]), "paranoid_memory_checks": lambda: random.choice([0] * 7 + [1]), "allow_unprepared_value": lambda: random.choice([0, 1]), + "track_and_verify_wals": lambda: random.choice([0, 1]), } _TEST_DIR_ENV_VAR = "TEST_TMPDIR" # If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR diff --git a/unreleased_history/new_features/track_and_verify_wals_api.md b/unreleased_history/new_features/track_and_verify_wals_api.md new file mode 100644 index 000000000000..b0889d0f9bb9 --- /dev/null +++ b/unreleased_history/new_features/track_and_verify_wals_api.md @@ -0,0 +1 @@ +Provide a new option `track_and_verify_wals` to track and verify various information about WAL during WAL recovery. This is intended to be a better replacement to `track_and_verify_wals_in_manifest`.