Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tianmu): support volcano framework #1554

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ static int setup_sig_handler() {

fs::path Engine::GetNextDataDir() {
std::scoped_lock guard(v_mtx);
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

if (tianmu_data_dirs.empty()) {
// fall back to use MySQL data directory
auto p = ha_tianmu_engine_->tianmu_data_dir / TIANMU_DATA_DIR;
auto p = eng->tianmu_data_dir / TIANMU_DATA_DIR;
if (!fs::is_directory(p))
fs::create_directory(p);
return p;
Expand Down Expand Up @@ -117,6 +118,7 @@ fs::path Engine::GetNextDataDir() {
}
return v[std::rand() % v.size()];
}

// round-robin
static int idx = 0;
return tianmu_data_dirs[idx++ % sz];
Expand All @@ -127,35 +129,43 @@ static int has_pack(const LEX_STRING &comment) {
std::string str(comment.str, comment.length);
boost::to_upper(str);
std::string val;

auto pos = str.find("PACK");
if (pos == std::string::npos)
return ret;

size_t val_pos = str.find(':', pos);
if (val_pos == std::string::npos)
return ret;

size_t term_pos = str.find(';', val_pos);
if (term_pos == std::string::npos) {
val = str.substr(val_pos + 1);
} else {
val = str.substr(val_pos + 1, term_pos - val_pos - 1);
}

boost::trim(val);
ret = atoi(val.c_str());
if (ret > common::DFT_PSS || ret <= 0)
ret = common::DFT_PSS;

return ret;
}

static std::string has_mem_name(const LEX_STRING &comment) {
std::string name = "";
std::string str(comment.str, comment.length);
boost::to_upper(str);

auto pos = str.find("ROWSTORE");
if (pos == std::string::npos)
return name;

size_t val_pos = str.find(':', pos);
if (val_pos == std::string::npos)
return name;

size_t term_pos = str.find(';', val_pos);
if (term_pos == std::string::npos) {
name = str.substr(val_pos + 1);
Expand Down Expand Up @@ -1374,6 +1384,7 @@ static void HandleDelayedLoad(uint32_t table_id, std::vector<std::unique_ptr<cha
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
close_thread_tables(thd);

if (thd->transaction_rollback_request) {
trans_rollback_implicit(thd);
thd->mdl_context.release_transactional_locks();
Expand All @@ -1386,17 +1397,22 @@ static void HandleDelayedLoad(uint32_t table_id, std::vector<std::unique_ptr<cha
if (thd->is_fatal_error) {
TIANMU_LOG(LogCtl_Level::ERROR, "LOAD DATA failed on table '%s'", tab_name.c_str());
}

thd->release_resources();
thd_manager->remove_thd(thd);
delete thd;

my_thread_end();
}

void DistributeLoad(std::unordered_map<uint32_t, std::vector<std::unique_ptr<char[]>>> &tm) {
utils::result_set<void> res;
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

for (auto &it : tm) {
res.insert(ha_tianmu_engine_->bg_load_thread_pool.add_task(HandleDelayedLoad, it.first, std::ref(it.second)));
res.insert(eng->bg_load_thread_pool.add_task(HandleDelayedLoad, it.first, std::ref(it.second)));
}

res.get_all();
tm.clear();
}
Expand Down Expand Up @@ -1462,6 +1478,7 @@ void Engine::ProcessDeltaStoreMerge() {
while (!mysqld_server_started) {
struct timespec abstime;
set_timespec(&abstime, 1);

mysql_cond_timedwait(&COND_server_started, &LOCK_server_started, &abstime);
if (exiting) {
mysql_mutex_unlock(&LOCK_server_started);
Expand All @@ -1472,6 +1489,8 @@ void Engine::ProcessDeltaStoreMerge() {

std::map<std::string, uint> sleep_cnts;
TIANMU_LOG(LogCtl_Level::INFO, "Tianmu merge delta store thread start...");
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

while (!exiting) {
if (!tianmu_sysvar_enable_rowstore) {
std::unique_lock<std::mutex> lk(cv_merge_mtx);
Expand All @@ -1487,16 +1506,21 @@ void Engine::ProcessDeltaStoreMerge() {
uint64_t record_count = delta_table->CountRecords();
if ((record_count >= tianmu_sysvar_insert_numthreshold ||
(sleep_cnts.count(name) && sleep_cnts[name] > tianmu_sysvar_insert_cntthreshold))) {
auto share = ha_tianmu_engine_->getTableShare(name);
auto share = eng->getTableShare(name);
auto table_id = share->TabID();

utils::BitSet null_mask(share->NumOfCols());

std::unique_ptr<char[]> buf(new char[sizeof(uint32_t) + name.size() + 1 + null_mask.data_size()]);
char *ptr = buf.get();
*(uint32_t *)ptr = table_id; // table id
ptr += sizeof(uint32_t);

std::memcpy(ptr, name.c_str(), name.size());
ptr += name.size();

*ptr++ = 0; // end with NUL

std::memcpy(ptr, null_mask.data(), null_mask.data_size());
need_merge_table[table_id].emplace_back(std::move(buf));
sleep_cnts[name] = 0;
Expand All @@ -1519,13 +1543,15 @@ void Engine::ProcessDeltaStoreMerge() {
continue;
}
}

if (!need_merge_table.empty())
DistributeLoad(need_merge_table);
else {
std::unique_lock<std::mutex> lk(cv_merge_mtx);
cv_merge.wait_for(lk, std::chrono::milliseconds(tianmu_sysvar_insert_wait_ms));
}
}

TIANMU_LOG(LogCtl_Level::INFO, "Tianmu merge delta store thread exiting...");
}

Expand Down Expand Up @@ -1984,12 +2010,13 @@ std::unique_ptr<system::IOParameters> Engine::CreateIOParameters(const std::stri

std::string data_dir;
std::string data_path;
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

if (fs::path(path).is_absolute()) {
data_dir = "";
data_path = path;
} else {
data_dir = ha_tianmu_engine_->tianmu_data_dir;
data_dir = eng->tianmu_data_dir;
std::string db_name, tab_name;
std::tie(db_name, tab_name) = GetNames(path);
data_path += db_name;
Expand Down Expand Up @@ -2024,6 +2051,7 @@ void Engine::ComputeTimeZoneDiffInMinutes(THD *thd, short &sign, short &minutes)
minutes = common::NULL_VALUE_SH;
return;
}

MYSQL_TIME client_zone, utc;
utc.year = 1970;
utc.month = 1;
Expand All @@ -2039,8 +2067,10 @@ void Engine::ComputeTimeZoneDiffInMinutes(THD *thd, short &sign, short &minutes)
long msecs;
sign = 1;
minutes = 0;

if (calc_time_diff(&utc, &client_zone, 1, &secs, &msecs))
sign = -1;

minutes = (short)(secs / 60);
}

Expand Down Expand Up @@ -2098,6 +2128,7 @@ common::TianmuError Engine::GetIOP(std::unique_ptr<system::IOParameters> &io_par
tdb = (char *)thd.db().str;

io_params = CreateIOParameters(&thd, table, arg);

short sign, minutes;
ComputeTimeZoneDiffInMinutes(&thd, sign, minutes);
io_params->SetTimeZone(sign, minutes);
Expand Down Expand Up @@ -2256,6 +2287,7 @@ std::shared_ptr<TableShare> Engine::GetTableShare(const TABLE_SHARE *table_share
table_share_map[name] = share;
return share;
}

return it->second;
} catch (common::Exception &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Failed to create table share: %s", e.what());
Expand Down Expand Up @@ -2288,6 +2320,7 @@ bool Engine::DeleteTableIndex(const std::string &table_path, [[maybe_unused]] TH
if (index::TianmuTableIndex::FindIndexTable(table_path)) {
index::TianmuTableIndex::DropIndexTable(table_path);
}

if (m_table_keys.find(table_path) != m_table_keys.end()) {
m_table_keys.erase(table_path);
}
Expand Down
4 changes: 3 additions & 1 deletion storage/tianmu/core/engine_execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
// at this point all tables are in RCBase engine, so we can proceed with the
// query and we know that if the result goes to the file, the TIANMU_DATAFORMAT is
// one of TIANMU formats
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
QueryRouteTo route = QueryRouteTo::kToTianmu;
SELECT_LEX *save_current_select = lex->current_select();
List<st_select_lex_unit> derived_optimized; // collection to remember derived
Expand All @@ -127,6 +128,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
res = FALSE;
int tianmu_free_join = FALSE;
lex->thd->derived_tables_processing = TRUE;

for (SELECT_LEX *sl = lex->all_selects_list; sl; sl = sl->next_select_in_list()) // for all selects
for (TABLE_LIST *cursor = sl->get_table_list(); cursor; cursor = cursor->next_local) // for all tables
if (cursor->table && cursor->is_view_or_derived()) { // data source (view or FROM subselect)
Expand Down Expand Up @@ -191,7 +193,7 @@ QueryRouteTo Engine::HandleSelect(THD *thd, LEX *lex, Query_result *&result, ulo
is_optimize_after_tianmu = TRUE;
if (!res) {
try {
route = ha_tianmu_engine_->Execute(unit->thd, unit->thd->lex, result, unit);
route = eng->Execute(unit->thd, unit->thd->lex, result, unit);
if (route == QueryRouteTo::kToMySQL) {
if (in_case_of_failure_can_go_to_mysql)
if (old_executed)
Expand Down
19 changes: 15 additions & 4 deletions storage/tianmu/core/parallel_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ int64_t ParallelHashJoiner::TraverseDim(MIIterator &mit, int64_t *outer_tuples)
int64_t traversed_rows = 0;
bool no_except = true;
utils::result_set<int64_t> res;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);

try {
for (MITaskIterator *iter : task_iterators) {
auto &ht = traversed_hash_tables_.emplace_back(hash_table_key_size_, hash_table_tuple_size_,
Expand All @@ -559,7 +562,7 @@ int64_t ParallelHashJoiner::TraverseDim(MIIterator &mit, int64_t *outer_tuples)
params.build_item = multi_index_builder_->CreateBuildItem();
params.task_miter = iter;

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncTraverseDim, this, &params));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncTraverseDim, this, &params));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down Expand Up @@ -793,6 +796,10 @@ int64_t ParallelHashJoiner::MatchDim(MIIterator &mit) {
std::vector<MatchTaskParams> match_task_params;
match_task_params.reserve(task_iterators.size());
int64_t matched_rows = 0;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

if (task_iterators.size() > 1) {
bool no_except = true;
utils::result_set<int64_t> res;
Expand All @@ -803,7 +810,7 @@ int64_t ParallelHashJoiner::MatchDim(MIIterator &mit) {
params.build_item = multi_index_builder_->CreateBuildItem();
params.task_miter = iter;

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncMatchDim, this, &params));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncMatchDim, this, &params));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down Expand Up @@ -1173,14 +1180,18 @@ int64_t ParallelHashJoiner::SubmitOuterMatched(MIIterator &miter) {
std::vector<OuterMatchedParams> outer_matched_params;
outer_matched_params.reserve(task_iterators.size());
utils::result_set<void> res;

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

try {
for (MITaskIterator *iter : task_iterators) {
auto &params = outer_matched_params.emplace_back();
params.task_iter = iter;
params.build_item = multi_index_builder_->CreateBuildItem();

res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParallelHashJoiner::AsyncSubmitOuterMatched, this,
&params, outer_matched_filter_.get()));
res.insert(eng->query_thread_pool.add_task(&ParallelHashJoiner::AsyncSubmitOuterMatched, this, &params,
outer_matched_filter_.get()));
}
} catch (std::exception &e) {
res.get_all_with_except();
Expand Down
10 changes: 6 additions & 4 deletions storage/tianmu/core/parameterized_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1529,13 +1529,16 @@ void ParameterizedFilter::ApplyDescriptor(int desc_number, int64_t limit)
pack_some++;
}

core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
assert(eng);

MIUpdatingIterator mit(mind_, dims);
desc.CopyDesCond(mit);
if (desc.EvaluateOnIndex(mit, limit) == common::ErrorCode::SUCCESS) {
tianmu_control_.lock(mind_->m_conn->GetThreadID())
<< "EvaluateOnIndex done, desc number " << desc_number << system::unlock;
} else {
int poolsize = ha_tianmu_engine_->query_thread_pool.size();
int poolsize = eng->query_thread_pool.size();
if ((tianmu_sysvar_threadpoolsize > 0) && (packs_no / poolsize > 0) && !desc.IsType_Subquery() &&
!desc.ExsitTmpTable()) {
int step = 0;
Expand Down Expand Up @@ -1573,9 +1576,8 @@ void ParameterizedFilter::ApplyDescriptor(int desc_number, int64_t limit)

utils::result_set<void> res;
for (int i = 0; i < task_num; ++i) {
res.insert(ha_tianmu_engine_->query_thread_pool.add_task(&ParameterizedFilter::TaskProcessPacks, this,
&taskIterator[i], current_txn_, rf, &dims, desc_number,
limit, one_dim));
res.insert(eng->query_thread_pool.add_task(&ParameterizedFilter::TaskProcessPacks, this, &taskIterator[i],
current_txn_, rf, &dims, desc_number, limit, one_dim));
}
res.get_all_with_except();

Expand Down
Loading