Skip to content

Commit

Permalink
Integrate wip/log-0.7 into master (#70)
Browse files Browse the repository at this point in the history
Key Updates:
1. Epoch Management Improvements:
   - Addressed race conditions in epoch handling to ensure consistent updates.
   - Improved handling for scenarios where epoch file writing takes longer than expected.
   - Enhanced logic to prevent invalid Epoch IDs caused by underflow during updates.
   - Modified startup behavior to treat missing Epoch files as equivalent to empty files, ensuring smoother recovery.

2. Manifest File Locking:
   - Added manifest file locking to prevent simultaneous database instances or conflicts caused by modifications through dblogutil.

Note:
- Draft documents were temporarily removed to avoid confusion in master. These documents will be finalized and included in a future merge.
  • Loading branch information
umegane authored Nov 28, 2024
1 parent 155fb6d commit 51a741d
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 35 deletions.
13 changes: 10 additions & 3 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ class datastore {
protected: // for tests
auto& log_channels_for_tests() const noexcept { return log_channels_; }
auto epoch_id_informed_for_tests() const noexcept { return epoch_id_informed_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_recorded_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }

private:
std::vector<std::unique_ptr<log_channel>> log_channels_;
Expand All @@ -261,7 +263,8 @@ class datastore {

std::atomic_uint64_t epoch_id_informed_{};

std::atomic_uint64_t epoch_id_recorded_{};
std::atomic_uint64_t epoch_id_to_be_recorded_{};
std::atomic_uint64_t epoch_id_record_finished_{};

std::unique_ptr<backup> backup_{};

Expand Down Expand Up @@ -302,6 +305,8 @@ class datastore {

std::mutex mtx_epoch_file_{};

std::mutex mtx_epoch_persistent_callback_{};

state state_{};

void add_file(const boost::filesystem::path& file) noexcept;
Expand All @@ -327,7 +332,6 @@ class datastore {
*/
void create_snapshot();

epoch_id_type last_durable_epoch_in_dir();

/**
* @brief requests the data store to rotate log files
Expand All @@ -342,6 +346,9 @@ class datastore {
int64_t current_unix_epoch_in_millis();

std::map<storage_id_type, write_version_type> clear_storage;

// File descriptor for file lock (flock) on the manifest file
int fd_for_flock_{-1};
};

} // namespace limestone::api
2 changes: 1 addition & 1 deletion include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class log_channel {

std::atomic_uint64_t finished_epoch_id_{0};

std::atomic<epoch_id_type> latest_ession_epoch_id_{0};
std::atomic<epoch_id_type> latest_session_epoch_id_{0};

std::mutex session_mutex_;

Expand Down
64 changes: 53 additions & 11 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ datastore::datastore(configuration const& conf) : location_(conf.data_locations_
}
}
internal::check_and_migrate_logdir_format(location_);

// acquire lock for manifest file
fd_for_flock_ = internal::acquire_manifest_lock(location_);
if (fd_for_flock_ == -1) {
if (errno == EWOULDBLOCK) {
std::string err_msg = "Another process is using the log directory: " + location_.string() + ". Terminate the conflicting process and restart this process.";
LOG_AND_THROW_IO_EXCEPTION(err_msg, errno);
} else {
std::string err_msg = "Failed to acquire lock for manifest in directory: " + location_.string();
LOG_AND_THROW_IO_EXCEPTION(err_msg, errno);
}
}

add_file(compaction_catalog_path);
compaction_catalog_ = std::make_unique<compaction_catalog>(compaction_catalog::from_catalog_file(location_));

Expand Down Expand Up @@ -149,22 +162,30 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
}

epoch_id_switched_.store(neid);
update_min_epoch_id(true);
if (state_ != state::not_ready) {
update_min_epoch_id(true);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
}
}

void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
auto upper_limit = epoch_id_switched_.load() - 1;
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
}
upper_limit--;
epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
auto working_epoch = static_cast<epoch_id_type>(e->current_epoch_id_.load());
if ((working_epoch - 1) < upper_limit) {
upper_limit = working_epoch - 1;
}
auto working_epoch = e->current_epoch_id_.load();
auto finished_epoch = e->finished_epoch_id_.load();
if (working_epoch > finished_epoch && working_epoch != UINT64_MAX) {
if ((working_epoch - 1) < upper_limit) {
upper_limit = working_epoch - 1;
}
}
if (max_finished_epoch < finished_epoch) {
max_finished_epoch = finished_epoch;
}
Expand All @@ -175,19 +196,21 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (from_switch_epoch && (to_be_epoch > static_cast<std::uint64_t>(max_finished_epoch))) {
to_be_epoch = static_cast<std::uint64_t>(max_finished_epoch);
}
auto old_epoch_id = epoch_id_recorded_.load();
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_file_);

if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
FILE* strm = fopen(epoch_file_path_.c_str(), "a"); // NOLINT(*-owning-memory)
if (!strm) {
LOG_AND_THROW_IO_EXCEPTION("fopen failed", errno);
}
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_recorded_.load()));
log_entry::durable_epoch(strm, static_cast<epoch_id_type>(epoch_id_to_be_recorded_.load()));
if (fflush(strm) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fflush failed", errno);
}
Expand All @@ -197,18 +220,29 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
if (fclose(strm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
}
epoch_id_record_finished_.store(epoch_id_to_be_recorded_.load());
break;
}
}
if (to_be_epoch > epoch_id_record_finished_.load()) {
return;
}

// update informed_epoch_
to_be_epoch = upper_limit;
// In `informed_epoch_`, the update restriction based on the `from_switch_epoch` condition is intentionally omitted.
// Due to the interface specifications of Shirakami, it is necessary to advance the epoch even if the log channel
// is not updated. This behavior differs from `recorded_epoch_` and should be maintained as such.
old_epoch_id = epoch_id_informed_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
}
if (persistent_callback_) {
persistent_callback_(to_be_epoch);
}
Expand Down Expand Up @@ -244,7 +278,15 @@ std::future<void> datastore::shutdown() noexcept {
VLOG(log_info) << "/:limestone:datastore:shutdown compaction task has been stopped.";
}

return std::async(std::launch::async, []{
if (fd_for_flock_ != -1) {
if (::close(fd_for_flock_) == -1) {
VLOG(log_error) << "Failed to close lock file descriptor: " << strerror(errno);
} else {
fd_for_flock_ = -1;
}
}

return std::async(std::launch::async, [] {
std::this_thread::sleep_for(std::chrono::microseconds(100000));
VLOG(log_info) << "/:limestone:datastore:shutdown end";
});
Expand Down
19 changes: 19 additions & 0 deletions src/limestone/datastore_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/file.h>
#include <fcntl.h>
#include <unistd.h>

#include <boost/filesystem.hpp>
#include <nlohmann/json.hpp>
Expand Down Expand Up @@ -170,4 +173,20 @@ void check_and_migrate_logdir_format(const boost::filesystem::path& logdir) {
}
}

int acquire_manifest_lock(const boost::filesystem::path& logdir) {
boost::filesystem::path manifest_path = logdir / std::string(manifest_file_name);

int fd = ::open(manifest_path.string().c_str(), O_RDWR); // NOLINT(hicpp-vararg, cppcoreguidelines-pro-type-vararg)
if (fd == -1) {
return -1;
}

if (::flock(fd, LOCK_EX | LOCK_NB) == -1) {
::close(fd);
return -1;
}
VLOG_LP(log_info) << "acquired lock on manifest file: " << manifest_path.string();
return fd;
}

} // namespace limestone::internal
19 changes: 11 additions & 8 deletions src/limestone/dblog_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ epoch_id_type dblog_scan::last_durable_epoch_in_dir() {
auto& from_dir = dblogdir_;
// read main epoch file first
auto main_epoch_file = from_dir / std::string(epoch_file_name);

// If main epoch file does not exist, create an empty one
if (!boost::filesystem::exists(main_epoch_file)) {
// datastore operations (ctor and rotate) ensure that the main epoch file exists.
// so it may directory called from outside of datastore
LOG_AND_THROW_EXCEPTION("epoch file does not exist: " + main_epoch_file.string());
}
std::optional<epoch_id_type> ld_epoch = last_durable_epoch(main_epoch_file);
if (ld_epoch.has_value()) {
return *ld_epoch;
std::ofstream(main_epoch_file.string()).close(); // Create an empty file
} else {
// If the file exists, attempt to get the last durable epoch
std::optional<epoch_id_type> ld_epoch = last_durable_epoch(main_epoch_file);
if (ld_epoch.has_value()) {
return *ld_epoch;
}
}

// main epoch file is empty,
// main epoch file is empty or does not contain a valid epoch,
// read all rotated-epoch files
std::optional<epoch_id_type> ld_epoch;
for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) {
if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name)
// this is epoch file (main one or rotated)
Expand Down
8 changes: 8 additions & 0 deletions src/limestone/dblogutil/dblogutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,19 @@ int main(char *dir, subcommand mode) { // NOLINT
}
try {
check_and_migrate_logdir_format(p);
int lock_fd = acquire_manifest_lock(p);
if (lock_fd == -1) {
LOG(ERROR) << "Another process is using the log directory: " << p
<< ". Terminate the conflicting process and re-execute the command. "
<< "Error: " << strerror(errno);
log_and_exit(64);
}
dblog_scan ds(p);
ds.set_thread_num(FLAGS_thread_num);
if (mode == cmd_inspect) inspect(ds, opt_epoch);
if (mode == cmd_repair) repair(ds, opt_epoch);
if (mode == cmd_compaction) compaction(ds, opt_epoch);
close(lock_fd);
} catch (limestone_exception& e) {
LOG(ERROR) << e.what();
log_and_exit(64);
Expand Down
7 changes: 7 additions & 0 deletions src/limestone/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ void setup_initial_logdir(const boost::filesystem::path& logdir);
*/
int is_supported_version(const boost::filesystem::path& manifest_path, std::string& errmsg);

// Validates the manifest file in the specified log directory and performs repair or migration if necessary.
void check_and_migrate_logdir_format(const boost::filesystem::path& logdir);

// Acquires an exclusive lock on the manifest file.
// Returns the file descriptor on success, or -1 on failure.
// Note: This function does not log errors or handle them internally.
// The caller must check the return value and use errno for error handling.
int acquire_manifest_lock(const boost::filesystem::path& logdir);

// from datastore_restore.cpp

status purge_dir(const boost::filesystem::path& dir);
Expand Down
8 changes: 4 additions & 4 deletions src/limestone/limestone_exception_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ inline void handle_exception_and_abort(std::string_view func_name) {
throw;
}
LOG_LP(FATAL) << "Fatal error in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (const std::runtime_error& e) {
LOG_LP(FATAL) << "Runtime error in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (const std::exception& e) {
LOG_LP(FATAL) << "Unexpected exception in " << func_name << ": " << e.what();
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
} catch (...) {
LOG_LP(FATAL) << "Unknown exception in " << func_name;
std::abort(); // Safety measure: this should never be reached due to VLOG_LP(google::FATAL)
std::abort(); // Safety measure: this should never be reached due to LOG_LP(FATAL)
}
}

Expand Down
25 changes: 20 additions & 5 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,26 @@ log_channel::log_channel(boost::filesystem::path location, std::size_t id, datas

void log_channel::begin_session() {
try {
// Synchronize `current_epoch_id_` with `epoch_id_switched_`.
// This loop is necessary to prevent inconsistencies in `current_epoch_id_`
// that could occur if `epoch_id_switched_` changes at a specific timing.
//
// Case where inconsistency occurs:
// 1. This thread (L) loads `epoch_id_switched_` and reads 10.
// 2. Another thread (S) immediately updates `epoch_id_switched_` to 11.
// 3. If the other thread (S) reads `current_epoch_id_` at this point,
// it expects `current_epoch_id_` to be consistent with the latest
// `epoch_id_switched_` value (11), but `current_epoch_id_` may still
// hold the outdated value, causing an inconsistency.
//
// This loop detects such inconsistencies and repeats until `current_epoch_id_`
// matches the latest value of `epoch_id_switched_`, ensuring consistency.
do {
current_epoch_id_.store(envelope_.epoch_id_switched_.load());
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());
latest_ession_epoch_id_.store(static_cast<epoch_id_type>(current_epoch_id_.load()));

latest_session_epoch_id_.store(static_cast<epoch_id_type>(current_epoch_id_.load()));

auto log_file = file_path();
strm_ = fopen(log_file.c_str(), "a"); // NOLINT(*-owning-memory)
Expand All @@ -60,7 +75,7 @@ void log_channel::begin_session() {
log_entry::begin_session(strm_, static_cast<epoch_id_type>(current_epoch_id_.load()));
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.insert(latest_ession_epoch_id_);
waiting_epoch_ids_.insert(latest_session_epoch_id_);
}
} catch (...) {
HANDLE_EXCEPTION_AND_ABORT();
Expand All @@ -76,8 +91,8 @@ void log_channel::end_session() {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
finished_epoch_id_.store(current_epoch_id_.load());
current_epoch_id_.store(UINT64_MAX);
envelope_.update_min_epoch_id();
current_epoch_id_.store(UINT64_MAX);

if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno);
Expand All @@ -86,7 +101,7 @@ void log_channel::end_session() {
// Remove current_epoch_id_ from waiting_epoch_ids_
{
std::lock_guard<std::mutex> lock(session_mutex_);
waiting_epoch_ids_.erase(latest_ession_epoch_id_.load());
waiting_epoch_ids_.erase(latest_session_epoch_id_.load());
// Notify waiting threads
session_cv_.notify_all();
}
Expand Down Expand Up @@ -178,7 +193,7 @@ rotation_result log_channel::do_rotate_file(epoch_id_type epoch) {
envelope_.subtract_file(location_ / file_);

// Create a rotation result with the current epoch ID
rotation_result result(new_name, latest_ession_epoch_id_);
rotation_result result(new_name, latest_session_epoch_id_);
return result;
}

Expand Down
Loading

0 comments on commit 51a741d

Please sign in to comment.