Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add wal replay and full checkpoint #1

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/buffer/buffer_pool_manager_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ auto BufferPoolManagerInstance::FlushPgImp(page_id_t page_id) -> bool {
return false;
}
// flush regardless of dirty or not
if (enable_logging && log_manager_->GetPersistentLSN() < pages_[frame_id].GetLSN()) {
log_manager_->Flush(true);
}
disk_manager_->WritePage(page_id, pages_[frame_id].GetData());
pages_[frame_id].is_dirty_ = false; // reset dirty flag regardless

Expand All @@ -138,6 +141,14 @@ void BufferPoolManagerInstance::FlushAllPgsImp() {
for (size_t i = 0; i < pool_size_; i++) {
if (pages_[i].page_id_ != INVALID_PAGE_ID) {
// there is a valid physical page resides, regardless of dirty or not
frame_id_t frame_id;
if (!page_table_->Find(pages_[i].page_id_, frame_id)) {
// page id is not present in the buffer pool
return;
}
if (enable_logging && log_manager_->GetPersistentLSN() < pages_[frame_id].GetLSN()) {
log_manager_->Flush(true);
}
disk_manager_->WritePage(pages_[i].page_id_, pages_[i].GetData());
pages_[i].is_dirty_ = false;
}
Expand Down Expand Up @@ -191,6 +202,9 @@ auto BufferPoolManagerInstance::FindVictim(frame_id_t *available_frame_id) -> bo
if (pages_[*available_frame_id].IsDirty()) {
// the page to be evicted is dirty, flush out
// don't call FlushPgImp, otherwise lead to deadlock
if (enable_logging && log_manager_->GetPersistentLSN() < pages_[*available_frame_id].GetLSN()) {
log_manager_->Flush(true);
}
disk_manager_->WritePage(pages_[*available_frame_id].page_id_, pages_[*available_frame_id].GetData());
pages_[*available_frame_id].is_dirty_ = false; // reset dirty flag regardless
}
Expand Down
26 changes: 26 additions & 0 deletions src/catalog/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,30 @@ auto Column::ToString(bool simplified) const -> std::string {
return (os.str());
}

void Column::SerializeTo(char *storage) const {
int column_name_size = column_name_.size();
memcpy(storage, &column_name_size, sizeof(int));
memcpy(storage + sizeof(int), column_name_.data(), column_name_.size());
memcpy(storage + sizeof(int) + column_name_.size(), &column_type_, sizeof(TypeId));
memcpy(storage + sizeof(int) + column_name_.size() + sizeof(TypeId), &fixed_length_, sizeof(u_int32_t));
memcpy(storage + sizeof(int) + column_name_.size() + sizeof(TypeId) + sizeof(u_int32_t), &variable_length_,
sizeof(u_int32_t));
memcpy(storage + sizeof(int) + column_name_.size() + sizeof(TypeId) + sizeof(u_int32_t) + sizeof(u_int32_t),
&column_offset_, sizeof(u_int32_t));
}

void Column::DeserializeFrom(const char *storage) {
int column_name_size = 0;
memcpy(&column_name_size, storage, sizeof(int));
this->column_name_.resize(column_name_size);
memcpy(column_name_.data(), storage + sizeof(int), column_name_size);
memcpy(&column_type_, storage + sizeof(int) + column_name_.size(), sizeof(TypeId));
memcpy(&fixed_length_, storage + sizeof(int) + column_name_.size() + sizeof(TypeId), sizeof(u_int32_t));
memcpy(&variable_length_, storage + sizeof(int) + column_name_.size() + sizeof(TypeId) + sizeof(u_int32_t),
sizeof(u_int32_t));
memcpy(&column_offset_,
storage + sizeof(int) + column_name_.size() + sizeof(TypeId) + sizeof(u_int32_t) + sizeof(u_int32_t),
sizeof(u_int32_t));
}

} // namespace bustub
14 changes: 14 additions & 0 deletions src/common/bustub_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "catalog/schema.h"
#include "catalog/table_generator.h"
#include "common/bustub_instance.h"

#include "common/enums/statement_type.h"
#include "common/exception.h"
#include "common/util/string_util.h"
Expand All @@ -31,6 +32,7 @@
#include "planner/planner.h"
#include "recovery/checkpoint_manager.h"
#include "recovery/log_manager.h"
#include "recovery/log_recovery.h"
#include "storage/disk/disk_manager.h"
#include "storage/disk/disk_manager_memory.h"
#include "type/value_factory.h"
Expand Down Expand Up @@ -71,6 +73,9 @@ BustubInstance::BustubInstance(const std::string &db_file_name) {

// Execution engine.
execution_engine_ = new ExecutionEngine(buffer_pool_manager_, txn_manager_, catalog_);

// LogRec
log_recovery_ = new LogRecovery(disk_manager_, buffer_pool_manager_, catalog_);
}

BustubInstance::BustubInstance() {
Expand Down Expand Up @@ -219,6 +224,12 @@ auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer,
throw bustub::Exception("Failed to create table");
}
WriteOneCell(fmt::format("Table created with id = {}", info->oid_), writer);
{
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::CREATETABLE,
create_stmt.table_, create_stmt.columns_);
lsn_t lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
}
continue;
}
case StatementType::INDEX_STATEMENT: {
Expand Down Expand Up @@ -391,6 +402,9 @@ BustubInstance::~BustubInstance() {
if (enable_logging) {
log_manager_->StopFlushThread();
}
if (enable_checkpointing) {
checkpoint_manager_->StopFlushThread();
}
delete execution_engine_;
delete catalog_;
delete checkpoint_manager_;
Expand Down
3 changes: 2 additions & 1 deletion src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
namespace bustub {

std::atomic<bool> enable_logging(false);
std::atomic<bool> enable_checkpointing(false);

std::chrono::duration<int64_t> log_timeout = std::chrono::seconds(1);

std::chrono::duration<int64_t> checkpoint_timeout = std::chrono::seconds(2);
std::chrono::milliseconds cycle_detection_interval = std::chrono::milliseconds(50);

} // namespace bustub
19 changes: 17 additions & 2 deletions src/concurrency/transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ auto TransactionManager::Begin(Transaction *txn, IsolationLevel isolation_level)
}

if (enable_logging) {
LogRecord record = LogRecord(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::BEGIN);
lsn_t lsn = log_manager_->AppendLogRecord(&record);
LogRecord log_record = LogRecord(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::BEGIN);
lsn_t lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
}

Expand All @@ -59,6 +59,14 @@ void TransactionManager::Commit(Transaction *txn) {
}
write_set->clear();

if (enable_logging) {
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::COMMIT);
auto lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
// TODO:cwhen committing ,we need to group commit log_manager_->Flush(false);
log_manager_->Flush(false);
}

// Release all the locks.
ReleaseLocks(txn);
// Release the global transaction latch.
Expand Down Expand Up @@ -109,6 +117,13 @@ void TransactionManager::Abort(Transaction *txn) {
table_write_set->clear();
index_write_set->clear();

if (enable_logging) {
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::ABORT);
auto lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
log_manager_->Flush(false);
}

// Release all the locks.
ReleaseLocks(txn);
// Release the global transaction latch.
Expand Down
2 changes: 1 addition & 1 deletion src/execution/seq_scan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ auto SeqScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
obtain_lock_ = false;
return true;
} // not what the filter wants, move to next tuple
cursor_++;
++cursor_;
}
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ class Catalog {
*
* NOTE: `tables_` owns all table metadata.
*/
std::unordered_map<table_oid_t, std::unique_ptr<TableInfo>> tables_;
std::unordered_map<table_oid_t, std::unique_ptr<TableInfo>> tables_{};

/** Map table name -> table identifiers. */
std::unordered_map<std::string, table_oid_t> table_names_;
std::unordered_map<std::string, table_oid_t> table_names_{};

/** The next table identifier to be used. */
std::atomic<table_oid_t> next_table_oid_{0};
Expand All @@ -364,10 +364,10 @@ class Catalog {
*
* NOTE: that `indexes_` owns all index metadata.
*/
std::unordered_map<index_oid_t, std::unique_ptr<IndexInfo>> indexes_;
std::unordered_map<index_oid_t, std::unique_ptr<IndexInfo>> indexes_{};

/** Map table name -> index names -> index identifiers. */
std::unordered_map<std::string, std::unordered_map<std::string, index_oid_t>> index_names_;
std::unordered_map<std::string, std::unordered_map<std::string, index_oid_t>> index_names_{};

/** The next index identifier to be used. */
std::atomic<index_oid_t> next_index_oid_{0};
Expand Down
14 changes: 14 additions & 0 deletions src/include/catalog/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Column {
friend class Schema;

public:
Column() = default;
/**
* Non-variable-length constructor for creating a Column.
* @param column_name name of the column
Expand Down Expand Up @@ -66,6 +67,16 @@ class Column {
fixed_length_(column.fixed_length_),
variable_length_(column.variable_length_),
column_offset_(column.column_offset_) {}
/**
* From wal
*/
Column(std::string column_name, const TypeId type, uint32_t fixed_length, u_int32_t variable_length,
uint32_t column_offset)
: column_name_(std::move(column_name)),
column_type_(type),
fixed_length_(fixed_length),
variable_length_(variable_length),
column_offset_(column_offset) {}

/** @return column name */
auto GetName() const -> std::string { return column_name_; }
Expand Down Expand Up @@ -96,6 +107,9 @@ class Column {
/** @return a string representation of this column */
auto ToString(bool simplified = true) const -> std::string;

void SerializeTo(char *storage) const;
void DeserializeFrom(const char *storage);

private:
/**
* Return the size in bytes of the type.
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/bustub_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class LogManager;
class CheckpointManager;
class Catalog;
class ExecutionEngine;
class LogRecovery;

class ResultWriter {
public:
Expand Down Expand Up @@ -253,6 +254,7 @@ class BustubInstance {
Catalog *catalog_;
ExecutionEngine *execution_engine_;
std::shared_mutex catalog_lock_;
LogRecovery *log_recovery_;

auto GetSessionVariable(const std::string &key) -> std::string {
if (session_variables_.find(key) != session_variables_.end()) {
Expand Down
5 changes: 4 additions & 1 deletion src/include/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ extern std::atomic<bool> enable_logging;
/** If ENABLE_LOGGING is true, the log should be flushed to disk every LOG_TIMEOUT. */
extern std::chrono::duration<int64_t> log_timeout;

extern std::atomic<bool> enable_checkpointing;
extern std::chrono::duration<int64_t> checkpoint_timeout;

static constexpr int INVALID_PAGE_ID = -1; // invalid page id
static constexpr int INVALID_TXN_ID = -1; // invalid transaction id
static constexpr int INVALID_LSN = -1; // invalid log sequence number
static constexpr int HEADER_PAGE_ID = 0; // the header page id
static constexpr int BUSTUB_PAGE_SIZE = 4096; // size of a data page in byte
static constexpr int BUSTUB_PAGE_SIZE = 1024; // size of a data page in byte
static constexpr int BUFFER_POOL_SIZE = 10; // size of buffer pool
static constexpr int LOG_BUFFER_SIZE = ((BUFFER_POOL_SIZE + 1) * BUSTUB_PAGE_SIZE); // size of a log buffer in byte
static constexpr int BUCKET_SIZE = 50; // size of extendible hash bucket
Expand Down
1 change: 1 addition & 0 deletions src/include/common/util/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <string>
#include <vector>
#include "common/config.h"

namespace bustub {

Expand Down
3 changes: 3 additions & 0 deletions src/include/recovery/checkpoint_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ class CheckpointManager {

void BeginCheckpoint();
void EndCheckpoint();
void RunCheckPointThread();
void StopFlushThread();

private:
TransactionManager *transaction_manager_ __attribute__((__unused__));
LogManager *log_manager_ __attribute__((__unused__));
BufferPoolManager *buffer_pool_manager_ __attribute__((__unused__));
std::thread *checkpoint_thread_;
};

} // namespace bustub
5 changes: 5 additions & 0 deletions src/include/recovery/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class LogManager {

void RunFlushThread();
void StopFlushThread();
void Flush(bool force);

auto AppendLogRecord(LogRecord *log_record) -> lsn_t;

Expand All @@ -61,12 +62,16 @@ class LogManager {

char *log_buffer_;
char *flush_buffer_;
int log_buffer_offset_ = 0;
int flush_buffer_offset_ = 0;

std::mutex latch_;

std::thread *flush_thread_ __attribute__((__unused__));

std::condition_variable cv_;
std::condition_variable cv_append_;
std::atomic_bool need_flush_ = false;

DiskManager *disk_manager_ __attribute__((__unused__));
};
Expand Down
19 changes: 19 additions & 0 deletions src/include/recovery/log_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ enum class LogRecordType {
ABORT,
/** Creating a new page in the table heap. */
NEWPAGE,

CREATETABLE
};

/**
Expand All @@ -57,6 +59,10 @@ enum class LogRecordType {
*--------------------------
* | HEADER | prev_page_id |
*--------------------------
* For create table
*--------------------------
* | HEADER | prev_page_id |
*--------------------------
*/
class LogRecord {
friend class LogManager;
Expand Down Expand Up @@ -110,6 +116,15 @@ class LogRecord {
size_ = HEADER_SIZE + sizeof(page_id_t) * 2;
}

// constructor for CREATETABLE type
LogRecord(txn_id_t txn_id, lsn_t prev_lsn, LogRecordType log_record_type, const std::string &table_name,
std::vector<Column> columns)
: txn_id_(txn_id), prev_lsn_(prev_lsn), log_record_type_(log_record_type) {
size_ = HEADER_SIZE + table_name.size() + sizeof(Schema);
table_name_ = table_name;
columns_ = columns;
}

~LogRecord() = default;

inline auto GetDeleteTuple() -> Tuple & { return delete_tuple_; }
Expand Down Expand Up @@ -177,6 +192,10 @@ class LogRecord {
page_id_t prev_page_id_{INVALID_PAGE_ID};
page_id_t page_id_{INVALID_PAGE_ID};
static const int HEADER_SIZE = 20;

// case5: for create table operation
std::string table_name_;
std::vector<Column> columns_{};
}; // namespace bustub

} // namespace bustub
10 changes: 9 additions & 1 deletion src/include/recovery/log_recovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "concurrency/lock_manager.h"
#include "recovery/log_record.h"

#include <catalog/catalog.h>

namespace bustub {

/**
Expand All @@ -32,18 +34,24 @@ class LogRecovery {
log_buffer_ = new char[LOG_BUFFER_SIZE];
}

LogRecovery(DiskManager *disk_manager, BufferPoolManager *buffer_pool_manager, Catalog *catalog)
: disk_manager_(disk_manager), buffer_pool_manager_(buffer_pool_manager), catalog_(catalog), offset_(0) {
log_buffer_ = new char[LOG_BUFFER_SIZE];
}

~LogRecovery() {
delete[] log_buffer_;
log_buffer_ = nullptr;
}

void Redo();
void Undo();
auto DeserializeLogRecord(const char *data, LogRecord *log_record) -> bool;
auto DeserializeLogRecord(const char *data, LogRecord *log_record) const -> bool;

private:
DiskManager *disk_manager_ __attribute__((__unused__));
BufferPoolManager *buffer_pool_manager_ __attribute__((__unused__));
Catalog *catalog_ __attribute__((__unused__));

/** Maintain active transactions and its corresponding latest lsn. */
std::unordered_map<txn_id_t, lsn_t> active_txn_;
Expand Down
Loading
Loading