Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: Primary table in rocksdb #19

Merged
merged 22 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/yb/master/sys_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,11 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
string data_root_dir = fs_manager->GetDataRootDirs()[0];
fs_manager->SetTabletPathByDataPath(kSysCatalogTabletId, data_root_dir);
auto metadata = VERIFY_RESULT(tablet::RaftGroupMetadata::CreateNew(tablet::RaftGroupMetadataData {
.fs_manager = fs_manager,
.table_info = table_info,
.raft_group_id = kSysCatalogTabletId,
.partition = partitions[0],
.tablet_data_state = tablet::TABLET_DATA_READY,
.snapshot_schedules = {},
fs_manager,
table_info,
kSysCatalogTabletId,
partitions[0],
tablet::TABLET_DATA_READY
}, data_root_dir));

RaftConfigPB config;
Expand Down
12 changes: 7 additions & 5 deletions src/yb/tablet/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,8 @@ message KvStoreInfoPB {
// compacted. Defaults to 0 (i.e. HybridTime::kMin).
optional uint64 last_full_compaction_time = 10;

// Initial version of the primary table. Set only when the tablet metadata is in RocksDB.
// Any subsequent version goes to RocksDB.
optional TableInfoPB initial_primary_table = 11;

// Table metadata schema. Set only when the metadata is in RocksDB.
optional SchemaPB metadata_schema = 12;
optional SchemaPB metadata_schema = 11;
}

// The super-block keeps track of the Raft group.
Expand Down Expand Up @@ -215,6 +211,12 @@ message RaftGroupReplicaSuperBlockPB {
optional OpIdPB cdc_sdk_min_checkpoint_op_id = 33;

optional fixed64 cdc_sdk_safe_time = 34;

optional TableType primary_table_type = 35;

optional bool transactional = 36 [default = false];

optional bool index_table = 37 [default = false];
}

message FilePB {
Expand Down
11 changes: 5 additions & 6 deletions src/yb/tablet/tablet-harness.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ Status TabletHarness::Create(bool first_time) {
"test-tablet", Primary::kTrue, "YBTableTest", "test", "YBTableTest", options_.table_type,
schema_, IndexMap(), boost::none, 0 /* schema_version */, partition.first);
auto metadata = VERIFY_RESULT(RaftGroupMetadata::TEST_LoadOrCreate(RaftGroupMetadataData {
.fs_manager = fs_manager_.get(),
.table_info = table_info,
.raft_group_id = options_.tablet_id,
.partition = partition.second,
.tablet_data_state = TABLET_DATA_READY,
.snapshot_schedules = {},
fs_manager_.get(),
table_info,
options_.tablet_id,
partition.second,
TABLET_DATA_READY
}));
if (options_.enable_metrics) {
metrics_registry_.reset(new MetricRegistry());
Expand Down
72 changes: 43 additions & 29 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,41 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
const std::string log_prefix_;
};

void Tablet::Init() {
LOG_WITH_FUNC(INFO) << "Inside init";
key_schema_ = std::make_unique<Schema>(metadata_->schema()->CreateKeyProjection());
CHECK(schema()->has_column_ids());
LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is "
<< metadata_->schema_version();

auto table_info = metadata_->primary_table_info();
if (table_metrics_entity_) {
table_metrics_entity_->SetAttribute("table_name", table_info->table_name);
table_metrics_entity_->SetAttribute("namespace_name", table_info->namespace_name);
}
if (tablet_metrics_entity_) {
tablet_metrics_entity_->SetAttribute("table_name", table_info->table_name);
tablet_metrics_entity_->SetAttribute("namespace_name", table_info->namespace_name);
}

bool has_index = !table_info->index_map->empty();

// Create index table metadata cache for secondary index update.
if (has_index) {
CreateNewYBMetaDataCache();
}

// If this is a unique index tablet, set up the index primary key schema.
if (table_info->index_info && table_info->index_info->is_unique()) {
unique_index_key_schema_ = std::make_unique<Schema>();
const auto ids = table_info->index_info->index_key_column_ids();
CHECK_OK(table_info->schema().CreateProjectionByIdsIgnoreMissing(
ids, unique_index_key_schema_.get()));
}
}

Tablet::Tablet(const TabletInitData& data)
: key_schema_(std::make_unique<Schema>(data.metadata->schema()->CreateKeyProjection())),
metadata_(data.metadata),
: metadata_(data.metadata),
table_type_(data.metadata->table_type()),
log_anchor_registry_(data.log_anchor_registry),
mem_tracker_(MemTracker::CreateTracker(
Expand All @@ -462,16 +494,10 @@ Tablet::Tablet(const TabletInitData& data)
clock_, data.allowed_history_cutoff_provider, metadata_.get())),
full_compaction_pool_(data.full_compaction_pool),
ts_post_split_compaction_added_(std::move(data.post_split_compaction_added)) {
CHECK(schema()->has_column_ids());
LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is "
<< metadata_->schema_version();

if (data.metric_registry) {
MetricEntity::AttributeMap attrs;
// TODO(KUDU-745): table_id is apparently not set in the metadata.
attrs["table_id"] = metadata_->table_id();
attrs["table_name"] = metadata_->table_name();
attrs["namespace_name"] = metadata_->namespace_name();
table_metrics_entity_ =
METRIC_ENTITY_table.Instantiate(data.metric_registry, metadata_->table_id(), attrs);
tablet_metrics_entity_ =
Expand All @@ -489,9 +515,7 @@ Tablet::Tablet(const TabletInitData& data)
mem_tracker_->SetMetricEntity(tablet_metrics_entity_);
}

auto table_info = metadata_->primary_table_info();
bool has_index = !table_info->index_map->empty();
bool transactional = data.metadata->schema()->table_properties().is_transactional();
bool transactional = data.metadata->is_transactional();
if (transactional) {
server::HybridClock::EnableClockSkewControl();
}
Expand All @@ -508,21 +532,8 @@ Tablet::Tablet(const TabletInitData& data)
}
}

// Create index table metadata cache for secondary index update.
if (has_index) {
CreateNewYBMetaDataCache();
}

// If this is a unique index tablet, set up the index primary key schema.
if (table_info->index_info && table_info->index_info->is_unique()) {
unique_index_key_schema_ = std::make_unique<Schema>();
const auto ids = table_info->index_info->index_key_column_ids();
CHECK_OK(table_info->schema().CreateProjectionByIdsIgnoreMissing(
ids, unique_index_key_schema_.get()));
}

if (data.transaction_coordinator_context &&
table_info->table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
table_type_ == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
transaction_coordinator_ = std::make_unique<TransactionCoordinator>(
metadata_->fs_manager()->uuid(),
data.transaction_coordinator_context,
Expand Down Expand Up @@ -567,7 +578,6 @@ Status Tablet::Open() {
TRACE_EVENT0("tablet", "Tablet::Open");
std::lock_guard<rw_spinlock> lock(component_lock_);
CHECK_EQ(state_, kInitialized) << "already open";
CHECK(schema()->has_column_ids());

switch (table_type_) {
case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
Expand Down Expand Up @@ -716,7 +726,7 @@ Status Tablet::OpenKeyValueTablet() {
static const std::string kIntentsDB = "IntentsDB"s;

rocksdb::BlockBasedTableOptions table_options;
if (!metadata()->primary_table_info()->index_info || metadata()->colocated()) {
if (!metadata()->is_index_table() || metadata()->colocated()) {
// This tablet is not dedicated to the index table, so it should be effective to use
// advanced key-value encoding algorithm optimized for docdb keys structure.
table_options.use_delta_encoding = true;
Expand Down Expand Up @@ -3551,14 +3561,18 @@ void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) {
}
}

size_t Tablet::TEST_CountRegularDBRecords() {
size_t Tablet::TEST_CountRegularDBRecords(bool skip_metadata_entries) {
if (!regular_db_) return 0;
rocksdb::ReadOptions read_opts;
read_opts.query_id = rocksdb::kDefaultQueryId;
docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_);

size_t result = 0;
for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
if (skip_metadata_entries &&
iter.key().starts_with(docdb::KeyEntryTypeAsChar::kTabletMetadata)) {
continue;
}
++result;
}
return result;
Expand Down Expand Up @@ -3657,7 +3671,7 @@ Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext(

if (transaction_id.is_initialized()) {
txn_id = transaction_id.get_ptr();
} else if (metadata_->schema()->table_properties().is_transactional() || is_ysql_catalog_table) {
} else if (metadata_->is_transactional() || is_ysql_catalog_table) {
// deadbeef-dead-beef-dead-beef00000075
static const TransactionId kArbitraryTxnIdForNonTxnReads(
17275436393656397278ULL, 8430738506459819486ULL);
Expand Down
12 changes: 7 additions & 5 deletions src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class Tablet : public AbstractTablet,

~Tablet();

void Init();

// Open the tablet.
// Upon completion, the tablet enters the kBootstrapping state.
Status Open();
Expand Down Expand Up @@ -621,7 +623,7 @@ class Tablet : public AbstractTablet,
// Dumps DocDB contents to log, every record as a separate log message, with the given prefix.
void TEST_DocDBDumpToLog(IncludeIntents include_intents);

size_t TEST_CountRegularDBRecords();
size_t TEST_CountRegularDBRecords(bool skip_metadata_entries = true);

Status CreateReadIntents(
const TransactionMetadataPB& transaction_metadata,
Expand Down Expand Up @@ -816,6 +818,10 @@ class Tablet : public AbstractTablet,
// critical failures.
Status ApplyAutoFlagsConfig(const AutoFlagsConfigPB& config);

Status UpsertMetadataDocOperation(
const std::vector<TableInfoPtr>& table_infos, Operation* operation = nullptr,
AlreadyAppliedToRegularDB already_applied_to_regular_db = AlreadyAppliedToRegularDB::kFalse);

std::string LogPrefix() const;

private:
Expand Down Expand Up @@ -921,10 +927,6 @@ class Tablet : public AbstractTablet,
Operation* operation,
AlreadyAppliedToRegularDB already_applied_to_regular_db = AlreadyAppliedToRegularDB::kFalse);

Status UpsertMetadataDocOperation(
const std::vector<TableInfoPtr>& table_infos, Operation* operation = nullptr,
AlreadyAppliedToRegularDB already_applied_to_regular_db = AlreadyAppliedToRegularDB::kFalse);

std::unique_ptr<const Schema> key_schema_;

RaftGroupMetadataPtr metadata_;
Expand Down
11 changes: 5 additions & 6 deletions src/yb/tablet/tablet_bootstrap-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ class BootstrapTest : public LogTestBase {
"TEST: ", Primary::kTrue, log::kTestTable, log::kTestNamespace, log::kTestTable, kTableType,
schema, IndexMap(), boost::none /* index_info */, 0 /* schema_version */, partition.first);
auto result = VERIFY_RESULT(RaftGroupMetadata::TEST_LoadOrCreate(RaftGroupMetadataData {
.fs_manager = fs_manager_.get(),
.table_info = table_info,
.raft_group_id = log::kTestTablet,
.partition = partition.second,
.tablet_data_state = TABLET_DATA_READY,
.snapshot_schedules = {},
fs_manager_.get(),
table_info,
log::kTestTablet,
partition.second,
TABLET_DATA_READY
}));
RETURN_NOT_OK(result->Flush());
return result;
Expand Down
13 changes: 9 additions & 4 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ class TabletBootstrap {
TabletPtr* rebuilt_tablet,
scoped_refptr<log::Log>* rebuilt_log,
consensus::ConsensusBootstrapInfo* consensus_info) {
LOG_WITH_FUNC(INFO) << "Starting bootstrap function";
const string tablet_id = meta_->raft_group_id();

// Replay requires a valid Consensus metadata file to exist in order to compare the committed
Expand Down Expand Up @@ -566,6 +567,7 @@ class TabletBootstrap {
if (!has_blocks && !needs_recovery) {
LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenLog(CreateNewSegment::kTrue), "Failed to open new log");
tablet_->Init();
RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
rebuilt_log,
rebuilt_tablet));
Expand All @@ -588,6 +590,7 @@ class TabletBootstrap {
tablet_id));
}

RETURN_NOT_OK(tablet_->metadata()->LoadTablesFromRocksDB(tablet_));
RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");

if (cmeta_->current_term() < consensus_info->last_id.term()) {
Expand All @@ -613,9 +616,9 @@ class TabletBootstrap {
RETURN_NOT_OK(tablet_->ModifyFlushedFrontier(
new_consensus_frontier, rocksdb::FrontierModificationMode::kForce));
}

tablet_->Init();
RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));

LOG_WITH_FUNC(INFO) << "Ending bootstrap function";
return Status::OK();
}

Expand Down Expand Up @@ -1487,10 +1490,11 @@ class TabletBootstrap {
consensus::LWReplicateMsg* replicate_msg,
AlreadyAppliedToRegularDB already_applied_to_regular_db) {
auto* request = replicate_msg->mutable_change_metadata_request();

LOG_WITH_FUNC(INFO) << "Inside PlayChangeMetadataRequest";
ChangeMetadataOperation operation(tablet_, log_.get(), request);

// If table id isn't in metadata, ignore the replay as the table might've been dropped.
// TODO: Once table metadata is stored in RocksDB, this check should not be required.
// If table id isn't in metadata, ignore the replay as the table might've been dropped
auto table_info = meta_->GetTableInfo(operation.table_id().ToBuffer());
if (!table_info.ok()) {
LOG_WITH_PREFIX(WARNING) << "Table ID " << operation.table_id()
Expand All @@ -1501,6 +1505,7 @@ class TabletBootstrap {
RETURN_NOT_OK(operation.Prepare(IsLeaderSide::kTrue));

if (tablet_->metadata()->IsTableMetadataInRocksDB()) {
LOG_WITH_FUNC(INFO) << "Playing ChangeMetadataRequest ";
operation.set_op_id(OpId::FromPB(replicate_msg->id()));
HybridTime hybrid_time(replicate_msg->hybrid_time());
operation.set_hybrid_time(hybrid_time);
Expand Down
Loading