Skip to content

Commit

Permalink
Merge pull request #68 from project-tsurugi/wip/log-0.7_i_1034
Browse files Browse the repository at this point in the history
addressed the case where epoch file writing takes a long time
  • Loading branch information
umegane authored Nov 28, 2024
2 parents 003f01a + 77552d6 commit c73f270
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
5 changes: 3 additions & 2 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class datastore {
protected: // for tests
auto& log_channels_for_tests() const noexcept { return log_channels_; }
auto epoch_id_informed_for_tests() const noexcept { return epoch_id_informed_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_recorded_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }
Expand All @@ -263,7 +263,8 @@ class datastore {

std::atomic_uint64_t epoch_id_informed_{};

std::atomic_uint64_t epoch_id_recorded_{};
std::atomic_uint64_t epoch_id_to_be_recorded_{};
std::atomic_uint64_t epoch_id_record_finished_{};

std::unique_ptr<backup> backup_{};

Expand Down
28 changes: 18 additions & 10 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
}

epoch_id_switched_.store(neid);
update_min_epoch_id(true);
if (state_ != state::not_ready) {
update_min_epoch_id(true);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
}
Expand All @@ -171,17 +173,19 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_recorded_ and epoch_id_informed_ is unnecessary.
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
}
upper_limit--;
epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
auto working_epoch = static_cast<epoch_id_type>(e->current_epoch_id_.load());
if ((working_epoch - 1) < upper_limit && working_epoch != UINT64_MAX) {
upper_limit = working_epoch - 1;
}
auto working_epoch = e->current_epoch_id_.load();
auto finished_epoch = e->finished_epoch_id_.load();
if (working_epoch > finished_epoch && working_epoch != UINT64_MAX) {
if ((working_epoch - 1) < upper_limit) {
upper_limit = working_epoch - 1;
}
}
if (max_finished_epoch < finished_epoch) {
max_finished_epoch = finished_epoch;
}
Expand All @@ -192,21 +196,21 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (from_switch_epoch && (to_be_epoch > static_cast<std::uint64_t>(max_finished_epoch))) {
to_be_epoch = static_cast<std::uint64_t>(max_finished_epoch);
}
auto old_epoch_id = epoch_id_recorded_.load();
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);
if (to_be_epoch < epoch_id_recorded_.load()) {
if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT(*-owning-memory)
if (!strm) {
LOG_AND_THROW_IO_EXCEPTION("fopen failed", errno);
}
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_recorded_.load()));
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_to_be_recorded_.load()));
if (fflush(strm) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed", errno);
}
Expand All @@ -216,9 +220,13 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (fclose(strm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}
epoch_id_record_finished_.store(epoch_id_to_be_recorded_.load());
break;
}
}
if (to_be_epoch > epoch_id_record_finished_.load()) {
return;
}

// update informed_epoch_
to_be_epoch = upper_limit;
Expand Down
2 changes: 1 addition & 1 deletion src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ void log_channel::end_session() {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
finished_epoch_id_.store(current_epoch_id_.load());
current_epoch_id_.store(UINT64_MAX);
envelope_.update_min_epoch_id();
current_epoch_id_.store(UINT64_MAX);

if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
Expand Down

0 comments on commit c73f270

Please sign in to comment.