diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 782921e1436..e1bb5766ced 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -244,6 +244,8 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, next_sequence += count; total_count += count; memtable_write_cnt++; + } else if (w->post_callback) { + w->post_callback->Callback(w->sequence); } } total_byte_size = WriteBatchInternal::AppendedByteSize( diff --git a/db/db_write_test.cc b/db/db_write_test.cc index d126aacf9c4..263c7a4590d 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -626,7 +626,6 @@ TEST_P(DBWriteTest, PostWriteCallback) { opts.wait = false; can_flush_mutex.Lock(); ASSERT_OK(dbfull()->Flush(opts)); - can_flush_mutex.Unlock(); flushed.store(true, std::memory_order_relaxed); })); @@ -635,7 +634,7 @@ TEST_P(DBWriteTest, PostWriteCallback) { ASSERT_EQ(flushed.load(std::memory_order_relaxed), false); the_first_can_exit_write_mutex.Unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds{10}); + std::this_thread::sleep_for(std::chrono::milliseconds{50}); ASSERT_EQ(written.load(std::memory_order_relaxed), 2); ASSERT_EQ(flushed.load(std::memory_order_relaxed), true); @@ -644,6 +643,23 @@ TEST_P(DBWriteTest, PostWriteCallback) { } } +TEST_P(DBWriteTest, PostWriteCallbackEmptyBatch) { + Options options = GetOptions(); + if (options.two_write_queues) { + // Not compatible. + return; + } + Reopen(options); + WriteBatch batch; + WriteOptions opts; + opts.sync = false; + opts.disableWAL = true; + SequenceNumber seq = 0; + SimpleCallback callback([&](SequenceNumber s) { seq = s; }); + ASSERT_OK(dbfull()->Write(opts, &batch, &callback)); + ASSERT_NE(seq, 0); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites,