Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trace log to Limestone for issue #1057 #74

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,25 @@ static void write_epoch_to_file_internal(const std::string& file_path, epoch_id_
}

void datastore::write_epoch_to_file(epoch_id_type epoch_id) {
TRACE_START << "epoch_id=" << epoch_id;
if (++epoch_write_counter >= max_entries_in_epoch_file) {
write_epoch_to_file_internal(tmp_epoch_file_path_.string(), epoch_id, file_write_mode::overwrite);

boost::system::error_code ec;
if (::rename(tmp_epoch_file_path_.c_str(), epoch_file_path_.c_str()) != 0) {
TRACE_ABORT;
LOG_AND_THROW_IO_EXCEPTION("Failed to rename temp file: " + tmp_epoch_file_path_.string() + " to " + epoch_file_path_.string(), errno);
}
boost::filesystem::remove(tmp_epoch_file_path_, ec);
if (ec) {
TRACE_ABORT;
LOG_AND_THROW_IO_EXCEPTION("Failed to remove temp file: " + tmp_epoch_file_path_.string(), ec);
}
epoch_write_counter = 0;
} else {
write_epoch_to_file_internal(epoch_file_path_.string(), epoch_id, file_write_mode::append);
}
TRACE_END;
}


Expand Down Expand Up @@ -210,6 +214,7 @@ log_channel& datastore::create_channel(const boost::filesystem::path& location)
epoch_id_type datastore::last_epoch() const noexcept { return static_cast<epoch_id_type>(epoch_id_informed_.load()); }

void datastore::switch_epoch(epoch_id_type new_epoch_id) {
TRACE_START << "new_epoch_id=" << new_epoch_id;
try {
check_after_ready(static_cast<const char*>(__func__));
auto neid = static_cast<std::uint64_t>(new_epoch_id);
Expand All @@ -222,16 +227,20 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
update_min_epoch_id(true);
}
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
TRACE_END;
}

void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
TRACE_START << "from_switch_epoch=" << from_switch_epoch;
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
}
upper_limit--;

epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
Expand All @@ -247,29 +256,39 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
}
}

TRACE << "epoch_id_switched_ = " << epoch_id_switched_.load() << ", upper_limit = " << upper_limit << ", max_finished_epoch = " << max_finished_epoch;

// update recorded_epoch_
auto to_be_epoch = upper_limit;
if (from_switch_epoch && (to_be_epoch > static_cast<std::uint64_t>(max_finished_epoch))) {
to_be_epoch = static_cast<std::uint64_t>(max_finished_epoch);
}

TRACE << "update epoch file part start with to_be_epoch = " << to_be_epoch;
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_to_be_recorded_ updated to " << to_be_epoch;
std::lock_guard<std::mutex> lock(mtx_epoch_file_);
write_epoch_to_file(static_cast<epoch_id_type>(to_be_epoch));
epoch_id_record_finished_.store(epoch_id_to_be_recorded_.load());
auto recorded_epoch = epoch_id_to_be_recorded_.load();
epoch_id_record_finished_.store(recorded_epoch);
TRACE << "epoch_id_record_finished_ updated to " << recorded_epoch;
break;
}
}
if (to_be_epoch > epoch_id_record_finished_.load()) {
TRACE << "skipping persistent callback part, to_be_epoch = " << to_be_epoch << ", epoch_id_record_finished_ = " << epoch_id_record_finished_.load();
TRACE_END;
return;
}

// update informed_epoch_
to_be_epoch = upper_limit;
TRACE << "persistent callback part start with to_be_epoch =" << to_be_epoch;
// In `informed_epoch_`, the update restriction based on the `from_switch_epoch` condition is intentionally omitted.
// Due to the interface specifications of Shirakami, it is necessary to advance the epoch even if the log channel
// is not updated. This behavior differs from `recorded_epoch_` and should be maintained as such.
Expand All @@ -279,13 +298,16 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
break;
}
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_informed_ updated to " << to_be_epoch;
{
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
}
if (persistent_callback_) {
TRACE << "start calling persistent callback to " << to_be_epoch;
persistent_callback_(to_be_epoch);
TRACE << "end calling persistent callback to " << to_be_epoch;
}
}
{
Expand All @@ -296,6 +318,7 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
break;
}
}
TRACE_END;
}


Expand Down Expand Up @@ -450,23 +473,23 @@ void datastore::recover([[maybe_unused]] const epoch_tag& tag) const noexcept {
}

rotation_result datastore::rotate_log_files() {
VLOG(50) << "start rotate_log_files()";
TRACE_START;
std::lock_guard<std::mutex> lock(rotate_mutex);
VLOG(50) << "start rotate_log_files() critical section";
TRACE << "start rotate_log_files() critical section";
auto epoch_id = epoch_id_switched_.load();
if (epoch_id == 0) {
LOG_AND_THROW_EXCEPTION("rotation requires epoch_id > 0, but got epoch_id = 0");
}
VLOG(50) << "epoch_id = " << epoch_id;
TRACE << "epoch_id = " << epoch_id;
{
on_wait1();
on_wait1(); // for testing
// Wait until epoch_id_informed_ is less than rotated_epoch_id to ensure safe rotation.
std::unique_lock<std::mutex> ul(informed_mutex);
while (epoch_id_informed_.load() < epoch_id) {
cv_epoch_informed.wait(ul);
}
}
VLOG(50) << "end waiting for epoch_id_informed_ to catch up";
TRACE << "end waiting for epoch_id_informed_ to catch up";
rotation_result result(epoch_id);
for (const auto& lc : log_channels_) {
boost::system::error_code error;
Expand All @@ -478,7 +501,7 @@ rotation_result datastore::rotate_log_files() {
result.add_rotated_file(rotated_file);
}
result.set_rotation_end_files(get_files());
VLOG(50) << "end rotate_log_files()";
TRACE_END;
return result;
}

Expand Down Expand Up @@ -586,7 +609,7 @@ void datastore::stop_online_compaction_worker() {
}

void datastore::compact_with_online() {
VLOG(50) << "start compact_with_online()";
TRACE_START;
check_after_ready(static_cast<const char*>(__func__));

// rotate first
Expand All @@ -606,7 +629,7 @@ void datastore::compact_with_online() {
(need_compaction_filenames.size() == 1 &&
need_compaction_filenames.find(compaction_catalog::get_compacted_filename()) != need_compaction_filenames.end())) {
LOG_LP(INFO) << "no files to compact";
VLOG(50) << "return compact_with_online() without compaction";
TRACE_END << "return compact_with_online() without compaction";
return;
}

Expand Down Expand Up @@ -655,7 +678,7 @@ void datastore::compact_with_online() {
remove_file_safely(location_ / compaction_catalog::get_compacted_backup_filename());

LOG_LP(INFO) << "compaction finished";
VLOG(50) << "end compact_with_online()";
TRACE_END;
}

} // namespace limestone::api
Expand Down
21 changes: 21 additions & 0 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void log_channel::begin_session() {
current_epoch_id_.store(envelope_.epoch_id_switched_.load());
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());
TRACE_START << "current_epoch_id_=" << current_epoch_id_.load();

auto log_file = file_path();
strm_ = fopen(log_file.c_str(), "a"); // NOLINT(*-owning-memory)
Expand All @@ -71,13 +72,16 @@ void log_channel::begin_session() {
registered_ = true;
}
log_entry::begin_session(strm_, static_cast<epoch_id_type>(current_epoch_id_.load()));
TRACE_END;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
}

void log_channel::end_session() {
try {
TRACE_START << "current_epoch_id_=" << current_epoch_id_.load();
if (fflush(strm_) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed", errno);
}
Expand All @@ -91,7 +95,9 @@ void log_channel::end_session() {
if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}
TRACE_END;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
}
Expand All @@ -106,52 +112,67 @@ void log_channel::abort_session([[maybe_unused]] status status_code, [[maybe_unu
}

void log_channel::add_entry(storage_id_type storage_id, std::string_view key, std::string_view value, write_version_type write_version) {
TRACE_START << "storage_id=" << storage_id << ", key=" << key << ",value = " << value << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
try {
log_entry::write(strm_, storage_id, key, value, write_version);
write_version_ = write_version;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
TRACE_END;
}

void log_channel::add_entry([[maybe_unused]] storage_id_type storage_id, [[maybe_unused]] std::string_view key, [[maybe_unused]] std::string_view value, [[maybe_unused]] write_version_type write_version, [[maybe_unused]] const std::vector<large_object_input>& large_objects) {
LOG_AND_THROW_EXCEPTION("not implemented");// FIXME
};

void log_channel::remove_entry(storage_id_type storage_id, std::string_view key, write_version_type write_version) {
TRACE_START << "storage_id=" << storage_id << ", key=" << key << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
try {
log_entry::write_remove(strm_, storage_id, key, write_version);
write_version_ = write_version;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
TRACE_END;
}

void log_channel::add_storage(storage_id_type storage_id, write_version_type write_version) {
TRACE_START << "storage_id=" << storage_id << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
try {
log_entry::write_add_storage(strm_, storage_id, write_version);
write_version_ = write_version;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
VLOG(50) << "end add_storage() with storage_id=" << storage_id << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
}

void log_channel::remove_storage(storage_id_type storage_id, write_version_type write_version) {
TRACE_START << "storage_id=" << storage_id << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
try {
log_entry::write_remove_storage(strm_, storage_id, write_version);
write_version_ = write_version;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
TRACE_END;
}

void log_channel::truncate_storage(storage_id_type storage_id, write_version_type write_version) {
TRACE_START << "storage_id=" << storage_id << ", epoch =" << write_version.epoch_number_ << ", minor =" << write_version.minor_write_version_;
try {
log_entry::write_clear_storage(strm_, storage_id, write_version);
write_version_ = write_version;
} catch (...) {
TRACE_ABORT;
HANDLE_EXCEPTION_AND_ABORT();
}
TRACE_END;
}

boost::filesystem::path log_channel::file_path() const noexcept {
Expand Down
41 changes: 41 additions & 0 deletions src/limestone/logging_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <array>
#include <string_view>
#include <thread>

namespace limestone {

Expand Down Expand Up @@ -112,4 +113,44 @@ constexpr auto location_prefix(const char (&prettyname)[N], const char (&funcnam
// NOLINTNEXTLINE
#define DVLOG_LP(x) _LOCATION_PREFIX_TO_STREAM(DVLOG(x))

// Cached thread name retrieval
inline std::string getThreadName() {
thread_local std::string cachedThreadName = [] {
std::array<char, 16> name = {"Unknown"};
if (pthread_getname_np(pthread_self(), name.data(), name.size()) != 0) {
return std::string("Unknown");
}
return std::string(name.data());
}();
return cachedThreadName;
}


// Log level for TRACE logging
constexpr int TRACE_LOG_LEVEL = 50;

// Common logging macro for TRACE operations
// NOLINTBEGIN(cppcoreguidelines-macro-usage)
#define TRACE_COMMON(label) \
if (!VLOG_IS_ON(TRACE_LOG_LEVEL)) {} \
else \
VLOG(TRACE_LOG_LEVEL) \
<< "[Thread " << std::this_thread::get_id() \
<< " (" << getThreadName() << ")] LIMESTONE TRACE: " << __func__ \
<< ((label)[0] ? " " : "") \
<< (label) \
<< ": "
// NOLINTEND(cppcoreguidelines-macro-usage)

// Specific macros for different TRACE use cases
// NOLINTNEXTLINE
#define TRACE TRACE_COMMON("trace")
// NOLINTNEXTLINE
#define TRACE_START TRACE_COMMON("start")
// NOLINTNEXTLINE
#define TRACE_END TRACE_COMMON("end")
// NOLINTNEXTLINE
#define TRACE_ABORT TRACE_COMMON("abort")


} // namespace