From 06a52bda64b9674b9d1e33204682f0bd78c77364 Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Mon, 11 Aug 2014 22:10:32 -0700 Subject: [PATCH] Flush only one column family Summary: Currently DBImpl::Flush() triggers flushes in all column families. Instead we need to trigger just the column family specified. Test Plan: make all check Reviewers: igor, ljin, yhchiang, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D20841 --- db/column_family_test.cc | 4 +- db/db_impl.cc | 384 ++++++++++++++++++++++----------------- db/db_impl.h | 34 +++- db/db_test.cc | 35 +++- 4 files changed, 283 insertions(+), 174 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 46f3796dd4f..75a4bc5c75c 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -913,7 +913,9 @@ TEST(ColumnFamilyTest, DontRollEmptyLogs) { } int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls(); // this will trigger the flushes - ASSERT_OK(db_->Write(WriteOptions(), nullptr)); + for (size_t i = 0; i <= 4; ++i) { + ASSERT_OK(Flush(i)); + } for (int i = 0; i < 4; ++i) { dbfull()->TEST_WaitForFlushMemTable(handles_[i]); diff --git a/db/db_impl.cc b/db/db_impl.cc index 95c77ff52cb..75aca2a52a3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -89,6 +89,20 @@ struct DBImpl::Writer { explicit Writer(port::Mutex* mu) : cv(mu) { } }; +struct DBImpl::WriteContext { + autovector superversions_to_free_; + autovector logs_to_free_; + + ~WriteContext() { + for (auto& sv : superversions_to_free_) { + delete sv; + } + for (auto& log : logs_to_free_) { + delete log; + } + } +}; + struct DBImpl::CompactionState { Compaction* const compaction; @@ -1843,8 +1857,31 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options) { - // nullptr batch means just wait for earlier writes to be done - Status s = Write(WriteOptions(), nullptr); + Writer w(&mutex_); + w.batch = nullptr; + w.sync = false; + w.disableWAL = false; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = kNoTimeOut; + + WriteContext context; + mutex_.Lock(); + Status s = BeginWrite(&w, 0); + assert(s.ok() && !w.done); // No timeout and nobody should do our job + + // SetNewMemtableAndNewLogFile() will release and reacquire mutex + // during execution + s = SetNewMemtableAndNewLogFile(cfd, &context); + cfd->imm()->FlushRequested(); + MaybeScheduleFlushOrCompaction(); + + assert(!writers_.empty()); + assert(writers_.front() == &w); + EndWrite(&w, &w, s); + mutex_.Unlock(); + + if (s.ok() && options.wait) { // Wait until the compaction completes s = WaitForFlushMemTable(cfd); @@ -3529,10 +3566,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { max_total_in_memory_state_ -= cfd->options()->write_buffer_size * cfd->options()->max_write_buffer_number; Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); - // Flush the memtables. This will make all WAL files referencing dropped - // column family to be obsolete. They will be deleted once user deletes - // column family handle - Write(WriteOptions(), nullptr); // ignore error } else { Log(options_.info_log, "Dropping column family with id %u FAILED -- %s\n", cfd->GetID(), s.ToString().c_str()); @@ -3728,38 +3761,22 @@ Status DBImpl::Delete(const WriteOptions& options, return DB::Delete(options, column_family, key); } -Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { - PERF_TIMER_AUTO(write_pre_and_post_process_time); - Writer w(&mutex_); - w.batch = my_batch; - w.sync = options.sync; - w.disableWAL = options.disableWAL; - w.in_batch_group = false; - w.done = false; - w.timeout_hint_us = options.timeout_hint_us; - - uint64_t expiration_time = 0; - if (w.timeout_hint_us == 0) { - w.timeout_hint_us = kNoTimeOut; - } else { - expiration_time = env_->NowMicros() + w.timeout_hint_us; - } - w.done = false; - - mutex_.Lock(); +// REQUIRES: mutex_ is held +Status DBImpl::BeginWrite(Writer* w, uint64_t expiration_time) { // the following code block pushes the current writer "w" into the writer // queue "writers_" and wait until one of the following conditions met: // 1. the job of "w" has been done by some other writers. // 2. "w" becomes the first writer in "writers_" // 3. "w" timed-out. - writers_.push_back(&w); + mutex_.AssertHeld(); + writers_.push_back(w); bool timed_out = false; - while (!w.done && &w != writers_.front()) { + while (!w->done && w != writers_.front()) { if (expiration_time == 0) { - w.cv.Wait(); - } else if (w.cv.TimedWait(expiration_time)) { - if (w.in_batch_group) { + w->cv.Wait(); + } else if (w->cv.TimedWait(expiration_time)) { + if (w->in_batch_group) { // then it means the front writer is currently doing the // write on behalf of this "timed-out" writer. Then it // should wait until the write completes. @@ -3771,24 +3788,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } - if (!options.disableWAL) { - RecordTick(stats_, WRITE_WITH_WAL); - default_cf_internal_stats_->AddDBStats( - InternalStats::WRITE_WITH_WAL, 1); - } - - if (w.done) { - default_cf_internal_stats_->AddDBStats( - InternalStats::WRITE_DONE_BY_OTHER, 1); - mutex_.Unlock(); - RecordTick(stats_, WRITE_DONE_BY_OTHER); - return w.status; - } else if (timed_out) { + if (timed_out) { #ifndef NDEBUG bool found = false; #endif for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { - if (*iter == &w) { + if (*iter == w) { writers_.erase(iter); #ifndef NDEBUG found = true; @@ -3805,14 +3810,77 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (!writers_.empty()) { writers_.front()->cv.Signal(); } + return Status::TimedOut(); + } + return Status::OK(); +} + +// REQUIRES: mutex_ is held +void DBImpl::EndWrite(Writer* w, Writer* last_writer, Status status) { + // Pop out the current writer and all writers being pushed before the + // current writer from the writer queue. + mutex_.AssertHeld(); + while (!writers_.empty()) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; + } + + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } +} + +Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { + if (my_batch == nullptr) { + return Status::Corruption("Batch is nullptr!"); + } + PERF_TIMER_AUTO(write_pre_and_post_process_time); + Writer w(&mutex_); + w.batch = my_batch; + w.sync = options.sync; + w.disableWAL = options.disableWAL; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = options.timeout_hint_us; + + uint64_t expiration_time = 0; + if (w.timeout_hint_us == 0) { + w.timeout_hint_us = kNoTimeOut; + } else { + expiration_time = env_->NowMicros() + w.timeout_hint_us; + } + + if (!options.disableWAL) { + RecordTick(stats_, WRITE_WITH_WAL); + default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); + } + + WriteContext context; + mutex_.Lock(); + Status status = BeginWrite(&w, expiration_time); + assert(status.ok() || status.IsTimedOut()); + if (status.IsTimedOut()) { mutex_.Unlock(); RecordTick(stats_, WRITE_TIMEDOUT); return Status::TimedOut(); - } else { - RecordTick(stats_, WRITE_DONE_BY_SELF); - default_cf_internal_stats_->AddDBStats( - InternalStats::WRITE_DONE_BY_SELF, 1); } + if (w.done) { // write was done by someone else + default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, + 1); + mutex_.Unlock(); + RecordTick(stats_, WRITE_DONE_BY_OTHER); + return w.status; + } + + RecordTick(stats_, WRITE_DONE_BY_SELF); + default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" @@ -3836,29 +3904,27 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { flush_column_family_if_log_file, total_log_size_, max_total_wal_size); } - Status status; - autovector superversions_to_free; - autovector logs_to_free; - if (LIKELY(single_column_family_mode_)) { // fast path - status = MakeRoomForWrite( - default_cf_handle_->cfd(), my_batch == nullptr, - &superversions_to_free, &logs_to_free, - expiration_time); + status = MakeRoomForWrite(default_cf_handle_->cfd(), + &context, expiration_time); } else { // refcounting cfd in iteration bool dead_cfd = false; for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->Ref(); - bool force_flush = - my_batch == nullptr || - (flush_column_family_if_log_file != 0 && - cfd->GetLogNumber() <= flush_column_family_if_log_file); - // May temporarily unlock and wait. - status = MakeRoomForWrite( - cfd, force_flush, &superversions_to_free, &logs_to_free, - expiration_time); + if (flush_column_family_if_log_file != 0 && + cfd->GetLogNumber() <= flush_column_family_if_log_file) { + // log size excedded limit and we need to do flush + // SetNewMemtableAndNewLogFie may temporarily unlock and wait + status = SetNewMemtableAndNewLogFile(cfd, &context); + cfd->imm()->FlushRequested(); + MaybeScheduleFlushOrCompaction(); + } else { + // May temporarily unlock and wait. + status = MakeRoomForWrite(cfd, &context, expiration_time); + } + if (cfd->Unref()) { dead_cfd = true; } @@ -3873,7 +3939,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; - if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions + if (status.ok()) { autovector write_batch_group; BuildBatchGroup(&last_writer, &write_batch_group); @@ -3969,36 +4035,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { bg_error_ = status; // stop compaction & fail any further writes } - // Pop out the current writer and all writers being pushed before the - // current writer from the writer queue. - while (!writers_.empty()) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } - if (ready == last_writer) break; - } - - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } + EndWrite(&w, last_writer, status); mutex_.Unlock(); if (status.IsTimedOut()) { RecordTick(stats_, WRITE_TIMEDOUT); } - for (auto& sv : superversions_to_free) { - delete sv; - } - for (auto& log : logs_to_free) { - delete log; - } - PERF_TIMER_STOP(write_pre_and_post_process_time); return status; } @@ -4095,16 +4138,14 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite( - ColumnFamilyData* cfd, bool force, - autovector* superversions_to_free, - autovector* logs_to_free, - uint64_t expiration_time) { +Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, + WriteContext* context, + uint64_t expiration_time) { mutex_.AssertHeld(); assert(!writers_.empty()); - bool allow_delay = !force; - bool allow_hard_rate_limit_delay = !force; - bool allow_soft_rate_limit_delay = !force; + bool allow_delay = true; + bool allow_hard_rate_limit_delay = true; + bool allow_soft_rate_limit_delay = true; uint64_t rate_limit_delay_millis = 0; Status s; double score; @@ -4145,7 +4186,7 @@ Status DBImpl::MakeRoomForWrite( cfd->internal_stats()->AddCFStats( InternalStats::LEVEL0_SLOWDOWN, delayed); delayed_writes_++; - } else if (!force && !cfd->mem()->ShouldFlush()) { + } else if (!cfd->mem()->ShouldFlush()) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); @@ -4228,82 +4269,91 @@ Status DBImpl::MakeRoomForWrite( mutex_.Lock(); cfd->internal_stats()->RecordLevelNSlowdown(max_level, elapsed, true); } else { - unique_ptr lfile; - log::Writer* new_log = nullptr; - MemTable* new_mem = nullptr; - - // Attempt to switch to a new memtable and trigger flush of old. - // Do this without holding the dbmutex lock. - assert(versions_->PrevLogNumber() == 0); - bool creating_new_log = !log_empty_; - uint64_t new_log_number = - creating_new_log ? versions_->NewFileNumber() : logfile_number_; - SuperVersion* new_superversion = nullptr; - mutex_.Unlock(); - { - DelayLoggingAndReset(); - if (creating_new_log) { - s = env_->NewWritableFile( - LogFileName(options_.wal_dir, new_log_number), &lfile, - env_->OptimizeForLogWrite(storage_options_)); - if (s.ok()) { - // Our final size should be less than write_buffer_size - // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * - cfd->options()->write_buffer_size); - new_log = new log::Writer(std::move(lfile)); - } - } - - if (s.ok()) { - new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); - new_superversion = new SuperVersion(); - } - } - mutex_.Lock(); + s = SetNewMemtableAndNewLogFile(cfd, context); if (!s.ok()) { - // how do we fail if we're not creating new log? - assert(creating_new_log); - // Avoid chewing through file number space in a tight loop. - versions_->ReuseLogFileNumber(new_log_number); - assert(!new_mem); - assert(!new_log); break; } - if (creating_new_log) { - logfile_number_ = new_log_number; - assert(new_log != nullptr); - logs_to_free->push_back(log_.release()); - log_.reset(new_log); - log_empty_ = true; - alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); - for (auto cfd : *versions_->GetColumnFamilySet()) { - // all this is just optimization to delete logs that - // are no longer needed -- if CF is empty, that means it - // doesn't need that particular log to stay alive, so we just - // advance the log number. no need to persist this in the manifest - if (cfd->mem()->GetFirstSequenceNumber() == 0 && - cfd->imm()->size() == 0) { - cfd->SetLogNumber(logfile_number_); - } - } + MaybeScheduleFlushOrCompaction(); + } + } + return s; +} + +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, + WriteContext* context) { + mutex_.AssertHeld(); + unique_ptr lfile; + log::Writer* new_log = nullptr; + MemTable* new_mem = nullptr; + + // Attempt to switch to a new memtable and trigger flush of old. + // Do this without holding the dbmutex lock. + assert(versions_->PrevLogNumber() == 0); + bool creating_new_log = !log_empty_; + uint64_t new_log_number = + creating_new_log ? versions_->NewFileNumber() : logfile_number_; + SuperVersion* new_superversion = nullptr; + mutex_.Unlock(); + Status s; + { + DelayLoggingAndReset(); + if (creating_new_log) { + s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), + &lfile, + env_->OptimizeForLogWrite(storage_options_)); + if (s.ok()) { + // Our final size should be less than write_buffer_size + // (compression, etc) but err on the side of caution. + lfile->SetPreallocationBlockSize(1.1 * + cfd->options()->write_buffer_size); + new_log = new log::Writer(std::move(lfile)); } - cfd->mem()->SetNextLogNumber(logfile_number_); - cfd->imm()->Add(cfd->mem()); - if (force) { - cfd->imm()->FlushRequested(); + } + + if (s.ok()) { + new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); + new_superversion = new SuperVersion(); + } + } + mutex_.Lock(); + if (!s.ok()) { + // how do we fail if we're not creating new log? + assert(creating_new_log); + // Avoid chewing through file number space in a tight loop. + versions_->ReuseLogFileNumber(new_log_number); + assert(!new_mem); + assert(!new_log); + return s; + } + if (creating_new_log) { + logfile_number_ = new_log_number; + assert(new_log != nullptr); + context->logs_to_free_.push_back(log_.release()); + log_.reset(new_log); + log_empty_ = true; + alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + for (auto cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (cfd->mem()->GetFirstSequenceNumber() == 0 && + cfd->imm()->size() == 0) { + cfd->SetLogNumber(logfile_number_); } - new_mem->Ref(); - cfd->SetMemtable(new_mem); - Log(options_.info_log, - "[%s] New memtable created with log file: #%" PRIu64 "\n", - cfd->GetName().c_str(), logfile_number_); - force = false; // Do not force another compaction if have room - MaybeScheduleFlushOrCompaction(); - superversions_to_free->push_back( - cfd->InstallSuperVersion(new_superversion, &mutex_)); } } + cfd->mem()->SetNextLogNumber(logfile_number_); + cfd->imm()->Add(cfd->mem()); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + Log(options_.info_log, + "[%s] New memtable created with log file: #%" PRIu64 "\n", + cfd->GetName().c_str(), logfile_number_); + context->superversions_to_free_.push_back( + cfd->InstallSuperVersion(new_superversion, &mutex_)); return s; } diff --git a/db/db_impl.h b/db/db_impl.h index fe0f42dfcd2..aff4b8e5093 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -309,6 +309,7 @@ class DBImpl : public DB { friend struct SuperVersion; struct CompactionState; struct Writer; + struct WriteContext; Status NewDB(); @@ -347,13 +348,38 @@ class DBImpl : public DB { uint64_t SlowdownAmount(int n, double bottom, double top); - // TODO(icanadi) free superversion_to_free and old_log outside of mutex + // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) + // thread should grab the mutex_ and be the first on writers queue. + // BeginWrite is used for it. + // Be aware! Writer's job can be done by other thread (see DBImpl::Write + // for examples), so check it via w.done before applying changes. + // + // Writer* w: writer to be placed in the queue + // uint64_t expiration_time: maximum time to be in the queue + // See also: EndWrite + Status BeginWrite(Writer* w, uint64_t expiration_time); + + // After doing write job, we need to remove already used writers from + // writers_ queue and notify head of the queue about it. + // EndWrite is used for this. + // + // Writer* w: Writer, that was added by BeginWrite function + // Writer* last_writer: Since we can join a few Writers (as DBImpl::Write + // does) + // we should pass last_writer as a parameter to + // EndWrite + // (if you don't touch other writers, just pass w) + // Status status: Status of write operation + // See also: BeginWrite + void EndWrite(Writer* w, Writer* last_writer, Status status); + Status MakeRoomForWrite(ColumnFamilyData* cfd, - bool force /* flush even if there is room? */, - autovector* superversions_to_free, - autovector* logs_to_free, + WriteContext* context, uint64_t expiration_time); + Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, + WriteContext* context); + void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group); diff --git a/db/db_test.cc b/db/db_test.cc index 9fb10335ba7..accddca52d6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5898,7 +5898,8 @@ TEST(DBTest, CompactOnFlush) { } namespace { -std::vector ListLogFiles(Env* env, const std::string& path) { +std::vector ListSpecificFiles( + Env* env, const std::string& path, const FileType expected_file_type) { std::vector files; std::vector log_files; env->GetChildren(path, &files); @@ -5906,15 +5907,45 @@ std::vector ListLogFiles(Env* env, const std::string& path) { FileType type; for (size_t i = 0; i < files.size(); ++i) { if (ParseFileName(files[i], &number, &type)) { - if (type == kLogFile) { + if (type == expected_file_type) { log_files.push_back(number); } } } return std::move(log_files); } + +std::vector ListLogFiles(Env* env, const std::string& path) { + return ListSpecificFiles(env, path, kLogFile); +} + +std::vector ListTableFiles(Env* env, const std::string& path) { + return ListSpecificFiles(env, path, kTableFile); +} } // namespace +TEST(DBTest, FlushOneColumnFamily) { + Options options; + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + &options); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "pikachu", "pikachu")); + ASSERT_OK(Put(2, "ilya", "ilya")); + ASSERT_OK(Put(3, "muromec", "muromec")); + ASSERT_OK(Put(4, "dobrynia", "dobrynia")); + ASSERT_OK(Put(5, "nikitich", "nikitich")); + ASSERT_OK(Put(6, "alyosha", "alyosha")); + ASSERT_OK(Put(7, "popovich", "popovich")); + + for (int i = 0; i < 8; ++i) { + Flush(i); + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), i + 1); + } +} + TEST(DBTest, WALArchivalTtl) { do { Options options = CurrentOptions();