Skip to content

Commit

Permalink
feat(tianmu): support volcano framework (#1554)
Browse files Browse the repository at this point in the history
Part3: To remove the `ha_tianmu_engine`, and gets it from hton's data. This makes
it behavior just like innodb.  MySQL gets innodb handler instance from table->s->file.
and it will make the code logic more concise.
  • Loading branch information
RingsC authored Apr 19, 2023
1 parent f79002e commit 6e56cef
Show file tree
Hide file tree
Showing 22 changed files with 591 additions and 195 deletions.
41 changes: 37 additions & 4 deletions storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ static int setup_sig_handler() {

fs::path Engine::GetNextDataDir() {
std::scoped_lock guard(v_mtx);
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

if (tianmu_data_dirs.empty()) {
// fall back to use MySQL data directory
auto p = ha_tianmu_engine_->tianmu_data_dir / TIANMU_DATA_DIR;
auto p = eng->tianmu_data_dir / TIANMU_DATA_DIR;
if (!fs::is_directory(p))
fs::create_directory(p);
return p;
Expand Down Expand Up @@ -117,6 +118,7 @@ fs::path Engine::GetNextDataDir() {
}
return v[std::rand() % v.size()];
}

// round-robin
static int idx = 0;
return tianmu_data_dirs[idx++ % sz];
Expand All @@ -127,35 +129,43 @@ static int has_pack(const LEX_STRING &comment) {
std::string str(comment.str, comment.length);
boost::to_upper(str);
std::string val;

auto pos = str.find("PACK");
if (pos == std::string::npos)
return ret;

size_t val_pos = str.find(':', pos);
if (val_pos == std::string::npos)
return ret;

size_t term_pos = str.find(';', val_pos);
if (term_pos == std::string::npos) {
val = str.substr(val_pos + 1);
} else {
val = str.substr(val_pos + 1, term_pos - val_pos - 1);
}

boost::trim(val);
ret = atoi(val.c_str());
if (ret > common::DFT_PSS || ret <= 0)
ret = common::DFT_PSS;

return ret;
}

static std::string has_mem_name(const LEX_STRING &comment) {
std::string name = "";
std::string str(comment.str, comment.length);
boost::to_upper(str);

auto pos = str.find("ROWSTORE");
if (pos == std::string::npos)
return name;

size_t val_pos = str.find(':', pos);
if (val_pos == std::string::npos)
return name;

size_t term_pos = str.find(';', val_pos);
if (term_pos == std::string::npos) {
name = str.substr(val_pos + 1);
Expand Down Expand Up @@ -1374,6 +1384,7 @@ static void HandleDelayedLoad(uint32_t table_id, std::vector<std::unique_ptr<cha
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
close_thread_tables(thd);

if (thd->transaction_rollback_request) {
trans_rollback_implicit(thd);
thd->mdl_context.release_transactional_locks();
Expand All @@ -1386,17 +1397,22 @@ static void HandleDelayedLoad(uint32_t table_id, std::vector<std::unique_ptr<cha
if (thd->is_fatal_error) {
TIANMU_LOG(LogCtl_Level::ERROR, "LOAD DATA failed on table '%s'", tab_name.c_str());
}

thd->release_resources();
thd_manager->remove_thd(thd);
delete thd;

my_thread_end();
}

void DistributeLoad(std::unordered_map<uint32_t, std::vector<std::unique_ptr<char[]>>> &tm) {
utils::result_set<void> res;
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

for (auto &it : tm) {
res.insert(ha_tianmu_engine_->bg_load_thread_pool.add_task(HandleDelayedLoad, it.first, std::ref(it.second)));
res.insert(eng->bg_load_thread_pool.add_task(HandleDelayedLoad, it.first, std::ref(it.second)));
}

res.get_all();
tm.clear();
}
Expand Down Expand Up @@ -1462,6 +1478,7 @@ void Engine::ProcessDeltaStoreMerge() {
while (!mysqld_server_started) {
struct timespec abstime;
set_timespec(&abstime, 1);

mysql_cond_timedwait(&COND_server_started, &LOCK_server_started, &abstime);
if (exiting) {
mysql_mutex_unlock(&LOCK_server_started);
Expand All @@ -1472,6 +1489,8 @@ void Engine::ProcessDeltaStoreMerge() {

std::map<std::string, uint> sleep_cnts;
TIANMU_LOG(LogCtl_Level::INFO, "Tianmu merge delta store thread start...");
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

while (!exiting) {
if (!tianmu_sysvar_enable_rowstore) {
std::unique_lock<std::mutex> lk(cv_merge_mtx);
Expand All @@ -1487,16 +1506,21 @@ void Engine::ProcessDeltaStoreMerge() {
uint64_t record_count = delta_table->CountRecords();
if ((record_count >= tianmu_sysvar_insert_numthreshold ||
(sleep_cnts.count(name) && sleep_cnts[name] > tianmu_sysvar_insert_cntthreshold))) {
auto share = ha_tianmu_engine_->getTableShare(name);
auto share = eng->getTableShare(name);
auto table_id = share->TabID();

utils::BitSet null_mask(share->NumOfCols());

std::unique_ptr<char[]> buf(new char[sizeof(uint32_t) + name.size() + 1 + null_mask.data_size()]);
char *ptr = buf.get();
*(uint32_t *)ptr = table_id; // table id
ptr += sizeof(uint32_t);

std::memcpy(ptr, name.c_str(), name.size());
ptr += name.size();

*ptr++ = 0; // end with NUL

std::memcpy(ptr, null_mask.data(), null_mask.data_size());
need_merge_table[table_id].emplace_back(std::move(buf));
sleep_cnts[name] = 0;
Expand All @@ -1519,13 +1543,15 @@ void Engine::ProcessDeltaStoreMerge() {
continue;
}
}

if (!need_merge_table.empty())
DistributeLoad(need_merge_table);
else {
std::unique_lock<std::mutex> lk(cv_merge_mtx);
cv_merge.wait_for(lk, std::chrono::milliseconds(tianmu_sysvar_insert_wait_ms));
}
}

TIANMU_LOG(LogCtl_Level::INFO, "Tianmu merge delta store thread exiting...");
}

Expand Down Expand Up @@ -1984,12 +2010,13 @@ std::unique_ptr<system::IOParameters> Engine::CreateIOParameters(const std::stri

std::string data_dir;
std::string data_path;
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

if (fs::path(path).is_absolute()) {
data_dir = "";
data_path = path;
} else {
data_dir = ha_tianmu_engine_->tianmu_data_dir;
data_dir = eng->tianmu_data_dir;
std::string db_name, tab_name;
std::tie(db_name, tab_name) = GetNames(path);
data_path += db_name;
Expand Down Expand Up @@ -2024,6 +2051,7 @@ void Engine::ComputeTimeZoneDiffInMinutes(THD *thd, short &sign, short &minutes)
minutes = common::NULL_VALUE_SH;
return;
}

MYSQL_TIME client_zone, utc;
utc.year = 1970;
utc.month = 1;
Expand All @@ -2039,8 +2067,10 @@ void Engine::ComputeTimeZoneDiffInMinutes(THD *thd, short &sign, short &minutes)
long msecs;
sign = 1;
minutes = 0;

if (calc_time_diff(&utc, &client_zone, 1, &secs, &msecs))
sign = -1;

minutes = (short)(secs / 60);
}

Expand Down Expand Up @@ -2098,6 +2128,7 @@ common::TianmuError Engine::GetIOP(std::unique_ptr<system::IOParameters> &io_par
tdb = (char *)thd.db().str;

io_params = CreateIOParameters(&thd, table, arg);

short sign, minutes;
ComputeTimeZoneDiffInMinutes(&thd, sign, minutes);
io_params->SetTimeZone(sign, minutes);
Expand Down Expand Up @@ -2256,6 +2287,7 @@ std::shared_ptr<TableShare> Engine::GetTableShare(const TABLE_SHARE *table_share
table_share_map[name] = share;
return share;
}

return it->second;
} catch (common::Exception &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Failed to create table share: %s", e.what());
Expand Down Expand Up @@ -2288,6 +2320,7 @@ bool Engine::DeleteTableIndex(const std::string &table_path, [[maybe_unused]] TH
if (index::TianmuTableIndex::FindIndexTable(table_path)) {
index::TianmuTableIndex::DropIndexTable(table_path);
}

if (m_table_keys.find(table_path) != m_table_keys.end()) {
m_table_keys.erase(table_path);
}
Expand Down
4 changes: 3 additions & 1 deletion storage/tianmu/core/engine_execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
// at this point all tables are in RCBase engine, so we can proceed with the
// query and we know that if the result goes to the file, the TIANMU_DATAFORMAT is
// one of TIANMU formats
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
QueryRouteTo route = QueryRouteTo::kToTianmu;
SELECT_LEX *save_current_select = lex->current_select();
List<st_select_lex_unit> derived_optimized; // collection to remember derived
Expand All @@ -127,6 +128,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
res = FALSE;
int tianmu_free_join = FALSE;
lex->thd->derived_tables_processing = TRUE;

for (SELECT_LEX *sl = lex->all_selects_list; sl; sl = sl->next_select_in_list()) // for all selects
for (TABLE_LIST *cursor = sl->get_table_list(); cursor; cursor = cursor->next_local) // for all tables
if (cursor->table && cursor->is_view_or_derived()) { // data source (view or FROM subselect)
Expand Down Expand Up @@ -191,7 +193,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
is_optimize_after_tianmu = TRUE;
if (!res) {
try {
route = ha_tianmu_engine_->Execute(unit->thd, unit->thd->lex, result, unit);
route = eng->Execute(unit->thd, unit->thd->lex, result, unit);
if (route == QueryRouteTo::kToMySQL) {
if (in_case_of_failure_can_go_to_mysql)
if (old_executed)
Expand Down
19 changes: 15 additions & 4 deletions storage/tianmu/core/parallel_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ int64_t ParallelHashJoiner::TraverseDim(MIIterator &mit, int64_t *outer_tuples)
int64_t traversed_rows = 0;
bool no_except = true;
utils::result_set<int64_t> res;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

try {
for (MITaskIterator *iter : task_iterators) {
auto &ht = traversed_hash_tables_.emplace_back(hash_table_key_size_, hash_table_tuple_size_,
Expand All @@ -559,7 +562,7 @@ int64_t ParallelHashJoiner::TraverseDim(MIIterator &mit, int64_t *outer_tuples)
params.build_item = multi_index_builder_->CreateBuildItem();
params.task_miter = iter;

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncTraverseDim, this, &params));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncTraverseDim, this, &params));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down Expand Up @@ -793,6 +796,10 @@ int64_t ParallelHashJoiner::MatchDim(MIIterator &mit) {
std::vector<MatchTaskParams> match_task_params;
match_task_params.reserve(task_iterators.size());
int64_t matched_rows = 0;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

if (task_iterators.size() > 1) {
bool no_except = true;
utils::result_set<int64_t> res;
Expand All @@ -803,7 +810,7 @@ int64_t ParallelHashJoiner::MatchDim(MIIterator &mit) {
params.build_item = multi_index_builder_->CreateBuildItem();
params.task_miter = iter;

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncMatchDim, this, &params));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncMatchDim, this, &params));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down Expand Up @@ -1173,14 +1180,18 @@ int64_t ParallelHashJoiner::SubmitOuterMatched(MIIterator &miter) {
std::vector<OuterMatchedParams> outer_matched_params;
outer_matched_params.reserve(task_iterators.size());
utils::result_set<void> res;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

try {
for (MITaskIterator *iter : task_iterators) {
auto &params = outer_matched_params.emplace_back();
params.task_iter = iter;
params.build_item = multi_index_builder_->CreateBuildItem();

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncSubmitOuterMatched, this,
&params, outer_matched_filter_.get()));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncSubmitOuterMatched, this, &params,
outer_matched_filter_.get()));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down
10 changes: 6 additions & 4 deletions storage/tianmu/core/parameterized_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1529,13 +1529,16 @@ void ParameterizedFilter::ApplyDescriptor(int desc_number, int64_t limit)
pack_some++;
}

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

MIUpdatingIterator mit(mind_, dims);
desc.CopyDesCond(mit);
if (desc.EvaluateOnIndex(mit, limit) == common::ErrorCode::SUCCESS) {
tianmu_control_.lock(mind_->m_conn->GetThreadID())
<< "EvaluateOnIndex done, desc number " << desc_number << system::unlock;
} else {
int poolsize = ha_tianmu_engine_->query_thread_pool.size();
int poolsize = eng->query_thread_pool.size();
if ((tianmu_sysvar_threadpoolsize > 0) && (packs_no / poolsize > 0) && !desc.IsType_Subquery() &&
!desc.ExsitTmpTable()) {
int step = 0;
Expand Down Expand Up @@ -1573,9 +1576,8 @@ void ParameterizedFilter::ApplyDescriptor(int desc_number, int64_t limit)

utils::result_set<void> res;
for (int i = 0; i < task_num; ++i) {
res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParameterizedFilter::TaskProcessPacks, this,
&taskIterator[i], current_txn_, rf, &dims, desc_number,
limit, one_dim));
res.insert(eng->query_thread_pool.add_task(&ParameterizedFilter::TaskProcessPacks, this, &taskIterator[i],
current_txn_, rf, &dims, desc_number, limit, one_dim));
}
res.get_all_with_except();

Expand Down
Loading

0 comments on commit 6e56cef

Please sign in to comment.