Skip to content

Commit

Permalink
Dev umegane 1105 (#79)
Browse files Browse the repository at this point in the history
Add test hooks and sample test cases for datastore and log_channel

Description:

* Added test hooks to the log_channel and datastore classes.
* Implemented a mechanism to inject test code into datastore_test using the test hooks.
* Created sample test cases using the test hooks:
    * race_detection_test.cpp for testing race conditions.
    * Supporting classes race_condition_test_manager.cpp and race_condition_test_manager.h added.

These changes enable more comprehensive testing for concurrency-related issues.
  • Loading branch information
umegane authored Jan 15, 2025
1 parent eded03b commit 62e323e
Show file tree
Hide file tree
Showing 11 changed files with 800 additions and 44 deletions.
56 changes: 50 additions & 6 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ class datastore {
protected: // for tests
auto& log_channels_for_tests() const noexcept { return log_channels_; }
auto epoch_id_informed_for_tests() const noexcept { return epoch_id_informed_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_to_be_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_record_finished_for_tests() const noexcept { return epoch_id_record_finished_.load(); }
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }
Expand All @@ -257,12 +258,55 @@ class datastore {
// They allow derived classes to inject custom behavior or notifications
// at specific wait points during the execution of the datastore class.
// The default implementation does nothing, ensuring no impact on production code.
virtual void on_wait1() {}
virtual void on_wait2() {}
virtual void on_wait3() {}
virtual void on_wait4() {}
virtual void on_rotate_log_files() noexcept {}
virtual void on_begin_session_current_epoch_id_store() noexcept {}
virtual void on_end_session_finished_epoch_id_store() noexcept {}
virtual void on_end_session_current_epoch_id_store() noexcept {}
virtual void on_switch_epoch_epoch_id_switched_store() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_switched_load() noexcept {}
virtual void on_update_min_epoch_id_current_epoch_id_load() noexcept {}
virtual void on_update_min_epoch_id_finished_epoch_id_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_to_be_recorded_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_to_be_recorded_cas() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_record_finished_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_load_1() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_cas() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_load_2() noexcept {}

/**
* @brief Sets the callback function for writing epoch to a file.
* @details
* This method allows you to override the default behavior for writing epoch
* information by providing a custom callback. The callback can be a free
* function, a lambda, or a member function bound to an object.
*
* Example:
* @code
* class CustomHandler {
* public:
* void custom_epoch_writer(epoch_id_type epoch) {
* // Custom logic
* }
* };
*
* datastore ds;
* CustomHandler handler;
* ds.set_write_epoch_callback([&handler](epoch_id_type epoch) {
* handler.custom_epoch_writer(epoch);
* });
* @endcode
*
* @param callback The new callback function to use for writing epoch.
*/
void set_write_epoch_callback(std::function<void(epoch_id_type)> callback) {
write_epoch_callback_ = std::move(callback);
}

private:
std::function<void(epoch_id_type)> write_epoch_callback_{
[this](epoch_id_type epoch) { this->write_epoch_to_file(epoch); }
};

std::vector<std::unique_ptr<log_channel>> log_channels_;

boost::filesystem::path location_{};
Expand Down Expand Up @@ -367,7 +411,7 @@ class datastore {
// File descriptor for file lock (flock) on the manifest file
int fd_for_flock_{-1};

void write_epoch_to_file(epoch_id_type epoch_id);
virtual void write_epoch_to_file(epoch_id_type epoch_id);

int epoch_write_counter = 0;
};
Expand Down
15 changes: 13 additions & 2 deletions include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ class log_channel {
*/
[[nodiscard]] boost::filesystem::path file_path() const noexcept;

/**
* @brief this is for test purpose only, must not be used for any purpose other than testing
*/
[[nodiscard]] auto current_epoch_id() const noexcept { return current_epoch_id_.load(); }

/**
* @brief this is for test purpose only, must not be used for any purpose other than testing
*/
[[nodiscard]] auto finished_epoch_id() const noexcept { return finished_epoch_id_.load(); }

private:
datastore& envelope_;

Expand All @@ -183,10 +193,11 @@ class log_channel {

std::atomic_uint64_t finished_epoch_id_{0};

log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept;

std::string do_rotate_file(epoch_id_type epoch = 0);

protected: // Protected to allow testing with derived classes
log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept;

friend class datastore;
friend class rotation_task;
};
Expand Down
18 changes: 15 additions & 3 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void datastore::ready() {
create_snapshot();
online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this);
if (epoch_id_switched_.load() != 0) {
write_epoch_to_file(epoch_id_informed_.load());
write_epoch_callback_(epoch_id_informed_.load());
}
cleanup_rotated_epoch_files(location_);
state_ = state::ready;
Expand Down Expand Up @@ -224,6 +224,7 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
LOG_LP(WARNING) << "switch to epoch_id_type of " << neid << " (<=" << switched << ") is curious";
}

on_switch_epoch_epoch_id_switched_store(); // for testing
epoch_id_switched_.store(neid);
if (state_ != state::not_ready) {
update_min_epoch_id(true);
Expand All @@ -237,6 +238,8 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {

void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
TRACE_START << "from_switch_epoch=" << from_switch_epoch;

on_update_min_epoch_id_epoch_id_switched_load(); // for testing
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
Expand All @@ -246,7 +249,9 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
on_update_min_epoch_id_current_epoch_id_load(); // for testing
auto working_epoch = e->current_epoch_id_.load();
on_update_min_epoch_id_finished_epoch_id_load(); // for testing
auto finished_epoch = e->finished_epoch_id_.load();
if (working_epoch > finished_epoch && working_epoch != UINT64_MAX) {
if ((working_epoch - 1) < upper_limit) {
Expand All @@ -267,23 +272,27 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
}

TRACE << "update epoch file part start with to_be_epoch = " << to_be_epoch;
on_update_min_epoch_id_epoch_id_to_be_recorded_load(); // for testing
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
on_update_min_epoch_id_epoch_id_to_be_recorded_cas(); // for testing
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_to_be_recorded_ updated to " << to_be_epoch;
on_update_min_epoch_id_epoch_id_to_be_recorded_load(); // for testing
std::lock_guard<std::mutex> lock(mtx_epoch_file_);
if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
write_epoch_to_file(static_cast<epoch_id_type>(to_be_epoch));
write_epoch_callback_(static_cast<epoch_id_type>(to_be_epoch));
epoch_id_record_finished_.store(to_be_epoch);
TRACE << "epoch_id_record_finished_ updated to " << to_be_epoch;
break;
}
}
on_update_min_epoch_id_epoch_id_record_finished_load(); // for testing
if (to_be_epoch > epoch_id_record_finished_.load()) {
TRACE << "skipping persistent callback part, to_be_epoch = " << to_be_epoch << ", epoch_id_record_finished_ = " << epoch_id_record_finished_.load();
TRACE_END;
Expand All @@ -296,14 +305,17 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
// In `informed_epoch_`, the update restriction based on the `from_switch_epoch` condition is intentionally omitted.
// Due to the interface specifications of Shirakami, it is necessary to advance the epoch even if the log channel
// is not updated. This behavior differs from `recorded_epoch_` and should be maintained as such.
on_update_min_epoch_id_epoch_id_informed_load_1(); // for testing
old_epoch_id = epoch_id_informed_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
on_update_min_epoch_id_epoch_id_informed_cas(); // for testing
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_informed_ updated to " << to_be_epoch;
{
on_update_min_epoch_id_epoch_id_informed_load_2(); // for testing
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
Expand Down Expand Up @@ -486,7 +498,7 @@ rotation_result datastore::rotate_log_files() {
}
TRACE << "epoch_id = " << epoch_id;
{
on_wait1(); // for testing
on_rotate_log_files(); // for testing
// Wait until epoch_id_informed_ is less than rotated_epoch_id to ensure safe rotation.
std::unique_lock<std::mutex> ul(informed_mutex);
while (epoch_id_informed_.load() < epoch_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void log_channel::begin_session() {
// This loop detects such inconsistencies and repeats until `current_epoch_id_`
// matches the latest value of `epoch_id_switched_`, ensuring consistency.
do {
envelope_.on_begin_session_current_epoch_id_store(); // for testing
current_epoch_id_.store(envelope_.epoch_id_switched_.load());
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());
Expand Down Expand Up @@ -88,8 +89,10 @@ void log_channel::end_session() {
if (fsync(fileno(strm_)) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
envelope_.on_end_session_finished_epoch_id_store(); // for testing
finished_epoch_id_.store(current_epoch_id_.load());
envelope_.update_min_epoch_id();
envelope_.on_end_session_current_epoch_id_store(); // for testing
current_epoch_id_.store(UINT64_MAX);

if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
Expand Down
1 change: 1 addition & 0 deletions src/limestone/logging_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <glog/logging.h>
#include <array>
#include <string_view>
#include <thread>
Expand Down
10 changes: 5 additions & 5 deletions test/limestone/compaction/compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,24 @@ class compaction_test : public ::testing::Test {
throw std::runtime_error("datastore_ must be of type datastore_test");
}

// Set up the on_wait1 callback to signal when rotate_log_files() reaches the wait point
test_datastore->on_wait1_callback = [&]() {
// Set up the on_rotate_log_files callback to signal when rotate_log_files() reaches the wait point
test_datastore->on_rotate_log_files_callback = [&]() {
std::unique_lock<std::mutex> lock(wait_mutex);
wait_triggered = true;
wait_cv.notify_one(); // Notify that on_wait1 has been triggered
wait_cv.notify_one(); // Notify that on_rotate_log_files has been triggered
};

try {
// Run compact_with_online in a separate thread
auto future = std::async(std::launch::async, [&]() { datastore_->compact_with_online(); });

// Wait for on_wait1 to be triggered (simulating the waiting in rotate_log_files)
// Wait for on_rotate_log_files to be triggered (simulating the waiting in rotate_log_files)
{
std::unique_lock<std::mutex> lock(wait_mutex);
wait_cv.wait(lock, [&]() { return wait_triggered; });
}

// Now switch the epoch after on_wait1 has been triggered
// Now switch the epoch after on_rotate_log_files has been triggered
datastore_->switch_epoch(epoch);

// Wait for the compact operation to finish
Expand Down
22 changes: 11 additions & 11 deletions test/limestone/epoch/finish_soon_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ TEST_F(finish_soon_test, same) {

datastore_->switch_epoch(2);
ASSERT_EQ(1, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(3);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.begin_session();
channel.end_session();

ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(4);
ASSERT_EQ(3, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(5);
ASSERT_EQ(4, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

// cleanup
datastore_->shutdown();
Expand All @@ -77,27 +77,27 @@ TEST_F(finish_soon_test, different) {

datastore_->switch_epoch(2);
ASSERT_EQ(1, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(3);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.begin_session();
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(4);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.end_session();
ASSERT_EQ(3, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(5);
ASSERT_EQ(4, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

// cleanup
datastore_->shutdown();
Expand Down
Loading

0 comments on commit 62e323e

Please sign in to comment.