From b09e4cc9c70a92b72564e2341e04a4a96d1861b1 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 27 May 2021 20:44:46 +0800 Subject: [PATCH] Revert Lazily Init Store (#2011) * Revert "Init store in background task. (#1843,#1896) (#1874)" This reverts commit 088246182c860db1d901028216b6680706bb29bb. Conflicts: dbms/src/Storages/StorageDeltaMerge.cpp * format code * Revert "Lazily initializing DeltaMergeStore (#1423) (#1751) (#1868)" This reverts commit bbce05022cbb1ec6e83be9e7b8ae679be937f073. Conflicts: dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp dbms/src/Storages/StorageDeltaMerge.cpp dbms/src/Storages/StorageDeltaMerge.h Co-authored-by: JinheLin Co-authored-by: Flowyi --- dbms/src/Server/Server.cpp | 29 -- .../tests/gtest_dm_storage_delta_merge.cpp | 173 --------- dbms/src/Storages/IManageableStorage.h | 3 - dbms/src/Storages/StorageDeltaMerge.cpp | 340 ++++-------------- dbms/src/Storages/StorageDeltaMerge.h | 33 +- 5 files changed, 78 insertions(+), 500 deletions(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 39a7f2d8079..099c685f9d4 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -347,33 +347,6 @@ struct RaftStoreProxyRunner : boost::noncopyable Logger * log; }; -// We only need this task run once. -void backgroundInitStores(Context & global_context, Logger * log) -{ - auto initStores = [&global_context, log]() { - auto storages = global_context.getTMTContext().getStorages().getAllStorage(); - int init_cnt = 0; - int err_cnt = 0; - for (auto & [table_id, storage] : storages) - { - try - { - init_cnt += storage->initStoreIfDataDirExist() ? 1 : 0; - LOG_INFO(log, "Storage inited done [table_id=" << table_id << "]"); - } - catch (...) - { - err_cnt++; - tryLogCurrentException(log, "Storage inited fail, [table_id=" + DB::toString(table_id) + "]"); - } - } - LOG_INFO(log, - "Storage inited finish. [total_count=" << storages.size() << "] [init_count=" << init_cnt << "] [error_count=" << err_cnt - << "]"); - }; - std::thread(initStores).detach(); -} - int Server::main(const std::vector & /*args*/) { setThreadName("TiFlashMain"); @@ -780,8 +753,6 @@ int Server::main(const std::vector & /*args*/) } LOG_DEBUG(log, "Sync schemas done."); - backgroundInitStores(*global_context, log); - // After schema synced, set current database. global_context->setCurrentDatabase(default_database); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index d052669bb7f..f18b1e0ac95 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -15,11 +15,8 @@ #include #include #include -#include #include -#define private public #include -#undef private #include #include #include @@ -160,176 +157,6 @@ try } CATCH -TEST(StorageDeltaMerge_test, Rename) -try -{ - Context ctx = DMTestEnv::getContext(); - std::shared_ptr storage; - DataTypes data_types; - Names column_names; - const String path_name = DB::tests::TiFlashTestEnv::getTemporaryPath(); - const String table_name = "tmp_table"; - const String db_name = "default"; - // create table - { - NamesAndTypesList names_and_types_list { - //{"col1", std::make_shared()}, - {"col1", std::make_shared()}, - {"col2", std::make_shared()}, - }; - for (const auto & name_type : names_and_types_list) - { - data_types.push_back(name_type.type); - column_names.push_back(name_type.name); - } - - Poco::File path(path_name); - if (path.exists()) - { - path.remove(true); - } - - // primary_expr_ast - ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); - astptr->children.emplace_back(new ASTIdentifier("col1")); - - storage = StorageDeltaMerge::create("TiFlash", - db_name, - table_name, - std::nullopt, - ColumnsDescription{names_and_types_list}, - astptr, - 0, - ctx); - storage->startup(); - } - - ASSERT_FALSE(storage->storeInited()); - ASSERT_EQ(storage->getTableName(), table_name); - ASSERT_FALSE(storage->storeInited()); - ASSERT_EQ(storage->getDatabaseName(), db_name); - ASSERT_FALSE(storage->storeInited()); - - // Rename database name before store object is created. - const String new_db_name = "new_" + storage->getDatabaseName(); - storage->rename(path_name, new_db_name, table_name, table_name); - ASSERT_FALSE(storage->storeInited()); - ASSERT_EQ(storage->getTableName(), table_name); - ASSERT_EQ(storage->getDatabaseName(), new_db_name); - - // prepare block data - Block sample; - { - ColumnWithTypeAndName col1; - col1.name = "col1"; - col1.type = std::make_shared(); - { - IColumn::MutablePtr m_col = col1.type->createColumn(); - // insert form large to small - for (int i = 0; i < 100; i++) - { - Field field = Int64(99 - i); - m_col->insert(field); - } - col1.column = std::move(m_col); - } - sample.insert(col1); - - ColumnWithTypeAndName col2; - col2.name = "col2"; - col2.type = std::make_shared(); - { - IColumn::MutablePtr m_col2 = col2.type->createColumn(); - for (int i = 0; i < 100; i++) - { - Field field("a", 1); - m_col2->insert(field); - } - col2.column = std::move(m_col2); - } - sample.insert(col2); - } - // Writing will create store object. - { - ASTPtr insertptr(new ASTInsertQuery()); - BlockOutputStreamPtr output = storage->write(insertptr, ctx.getSettingsRef()); - output->writePrefix(); - output->write(sample); - output->writeSuffix(); - ASSERT_TRUE(storage->storeInited()); - } - - // Rename table name - String new_table_name = "new_" + storage->getTableName(); - storage->rename(path_name, new_db_name, new_table_name, new_table_name); - ASSERT_EQ(storage->getTableName(), new_table_name); - ASSERT_EQ(storage->getDatabaseName(), new_db_name); - -} -CATCH - -TEST(StorageDeltaMerge_test, HandleCol) -try -{ - Context ctx = DMTestEnv::getContext(); - std::shared_ptr storage; - DataTypes data_types; - Names column_names; - const String path_name = DB::tests::TiFlashTestEnv::getTemporaryPath(); - const String table_name = "tmp_table"; - const String db_name = "default"; - // create table - { - NamesAndTypesList names_and_types_list { - //{"col1", std::make_shared()}, - {"col1", std::make_shared()}, - {"col2", std::make_shared()}, - }; - for (const auto & name_type : names_and_types_list) - { - data_types.push_back(name_type.type); - column_names.push_back(name_type.name); - } - - Poco::File path(path_name); - if (path.exists()) - { - path.remove(true); - } - - // primary_expr_ast - ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); - astptr->children.emplace_back(new ASTIdentifier("col1")); - - storage = StorageDeltaMerge::create("TiFlash", - db_name, - table_name, - std::nullopt, - ColumnsDescription{names_and_types_list}, - astptr, - 0, - ctx); - storage->startup(); - } - - ASSERT_FALSE(storage->storeInited()); - auto pk_type = storage->getPKTypeImpl(); - auto sort_desc = storage->getPrimarySortDescription(); - ASSERT_FALSE(storage->storeInited()); - - auto& store = storage->getStore(); - ASSERT_TRUE(storage->storeInited()); - auto pk_type2 = store->getPKDataType(); - auto sort_desc2 = store->getPrimarySortDescription(); - - ASSERT_EQ(pk_type->getTypeId(), pk_type2->getTypeId()); - ASSERT_EQ(sort_desc.size(), 1u); - ASSERT_EQ(sort_desc2.size(), 1u); - ASSERT_EQ(sort_desc.front().column_name, sort_desc2.front().column_name); - ASSERT_EQ(sort_desc.front().direction, sort_desc2.front().direction); - ASSERT_EQ(sort_desc.front().nulls_direction, sort_desc2.front().nulls_direction); -} -CATCH TEST(StorageDeltaMerge_internal_test, GetMergedQueryRanges) { diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index ccc1af8e96c..1510aab1c26 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -57,9 +57,6 @@ class IManageableStorage : public IStorage // `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/) { throw Exception("Unsupported"); } - - // Return true is data dir exist - virtual bool initStoreIfDataDirExist() { throw Exception("Unsupported"); } virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 839a59908e7..b109db31f7a 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -61,7 +61,6 @@ StorageDeltaMerge::StorageDeltaMerge( // Context & global_context_) : IManageableStorage{columns_, tombstone}, data_path_contains_database_name(db_engine != "TiFlash"), - store_inited(false), max_column_id_used(0), global_context(global_context_.getGlobalContext()), log(&Logger::get("StorageDeltaMerge")) @@ -79,137 +78,84 @@ StorageDeltaMerge::StorageDeltaMerge( // pk_is_handle = tidb_table_info.pk_is_handle; } - table_column_info = std::make_unique(db_name_, table_name_, primary_expr_ast_); - - updateTableColumnInfo(); -} - -void StorageDeltaMerge::updateTableColumnInfo() -{ - const ColumnsDescription & columns = getColumns(); - - LOG_INFO(log, - __FILE__ << " " << __func__ << " TableName " << getTableName() << " ordinary " << columns.ordinary.toString() << " materialized " - << columns.materialized.toString()); - - auto & pk_expr_ast = table_column_info->pk_expr_ast; - auto & handle_column_define = table_column_info->handle_column_define; - auto & table_column_defines = table_column_info->table_column_defines; - handle_column_define.name.clear(); - table_column_defines.clear(); - pk_column_names.clear(); std::unordered_set pks; - if (!tidb_table_info.columns.empty()) + for (size_t i = 0; i < primary_expr_ast_->children.size(); ++i) { - if (pk_is_handle) - { - for (const auto & col : tidb_table_info.columns) - { - if (col.hasPriKeyFlag()) - { - pks.emplace(col.name); - pk_column_names.emplace_back(col.name); - } - } - } - else - { - pks.emplace(EXTRA_HANDLE_COLUMN_NAME); - pk_column_names.emplace_back(EXTRA_HANDLE_COLUMN_NAME); - } - } - else - { - for (size_t i = 0; i < pk_expr_ast->children.size(); ++i) - { - auto col_name = pk_expr_ast->children[i]->getColumnName(); - pks.emplace(col_name); - pk_column_names.emplace_back(col_name); - } + auto col_name = primary_expr_ast_->children[i]->getColumnName(); + pks.emplace(col_name); + pk_column_names.emplace_back(col_name); } - ColumnsDescription new_columns(columns.ordinary, columns.materialized, columns.aliases, columns.defaults); - size_t pks_combined_bytes = 0; - auto all_columns = columns.getAllPhysical(); + ColumnsDescription new_columns(columns_.ordinary, columns_.materialized, columns_.materialized, columns_.defaults); + size_t pks_combined_bytes = 0; + auto all_columns = getColumns().getAllPhysical(); + ColumnDefines table_column_defines; // column defines used in DeltaMergeStore + ColumnDefine handle_column_define; /// rowkey_column_defines is the columns used to generate rowkey in TiDB /// if is_common_handle = true || pk_is_handle = true, it is the primary keys in TiDB table's definition /// otherwise, it is _tidb_rowid ColumnDefines rowkey_column_defines; for (const auto & col : all_columns) { - ColumnDefine col_def(0, col.name, col.type); - if (!tidb_table_info.columns.empty()) + ColumnDefine column_define(0, col.name, col.type); + if (table_info_) { /// If TableInfo from TiDB is not empty, we get column id and default value from TiDB - auto & columns = tidb_table_info.columns; - col_def.id = tidb_table_info.getColumnID(col_def.name); - auto itr = std::find_if(columns.begin(), columns.end(), [&](const ColumnInfo & v) { return v.id == col_def.id; }); - if (itr != columns.end()) - { - col_def.default_value = itr->defaultValueToField(); - } - - if (col_def.id != TiDBPkColumnID && col_def.id != VersionColumnID && col_def.id != DelMarkColumnID - && tidb_table_info.getColumnInfo(col_def.id).hasPriKeyFlag()) + auto & columns = table_info_->get().columns; + column_define.id = table_info_->get().getColumnID(column_define.name); + auto column + = std::find_if(columns.begin(), columns.end(), [&](const ColumnInfo & v) -> bool { return v.id == column_define.id; }); + + if (column != columns.end()) + column_define.default_value = column->defaultValueToField(); + if (column_define.id != TiDBPkColumnID && table_info_->get().getColumnInfo(column_define.id).hasPriKeyFlag()) { - rowkey_column_defines.push_back(col_def); + rowkey_column_defines.push_back(column_define); } } else { // in test cases, we allocate column_id here - col_def.id = max_column_id_used++; + column_define.id = max_column_id_used++; } + if (pks.count(col.name)) { if (!is_common_handle) { if (!col.type->isValueRepresentedByInteger()) - { throw Exception("pk column " + col.name + " is not representable by integer"); - } + pks_combined_bytes += col.type->getSizeOfValueInMemory(); if (pks_combined_bytes > sizeof(Handle)) - { throw Exception("pk columns exceeds size limit :" + DB::toString(sizeof(Handle))); - } } + if (pks.size() == 1) - { - handle_column_define = col_def; - } + handle_column_define = column_define; } - table_column_defines.push_back(col_def); - } - if (!new_columns.materialized.contains(VERSION_COLUMN_NAME)) - { - hidden_columns.emplace_back(VERSION_COLUMN_NAME); - new_columns.materialized.emplace_back(VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE); - } - if (!new_columns.materialized.contains(TAG_COLUMN_NAME)) - { - hidden_columns.emplace_back(TAG_COLUMN_NAME); - new_columns.materialized.emplace_back(TAG_COLUMN_NAME, TAG_COLUMN_TYPE); + table_column_defines.push_back(column_define); } + hidden_columns.emplace_back(VERSION_COLUMN_NAME); + hidden_columns.emplace_back(TAG_COLUMN_NAME); + new_columns.materialized.emplace_back(VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE); + new_columns.materialized.emplace_back(TAG_COLUMN_NAME, TAG_COLUMN_TYPE); + if (pks.size() > 1) { if (unlikely(is_common_handle)) - { throw Exception("Should not reach here: common handle with pk size > 1", ErrorCodes::LOGICAL_ERROR); - } handle_column_define.id = EXTRA_HANDLE_COLUMN_ID; handle_column_define.name = EXTRA_HANDLE_COLUMN_NAME; handle_column_define.type = EXTRA_HANDLE_COLUMN_INT_TYPE; - if (!new_columns.materialized.contains(EXTRA_HANDLE_COLUMN_NAME)) - { - hidden_columns.emplace_back(EXTRA_HANDLE_COLUMN_NAME); - new_columns.materialized.emplace_back(EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE); - } + + hidden_columns.emplace_back(EXTRA_HANDLE_COLUMN_NAME); + new_columns.materialized.emplace_back(EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE); } setColumns(new_columns); @@ -223,10 +169,10 @@ void StorageDeltaMerge::updateTableColumnInfo() // create table statement. // Here we throw a PrimaryKeyNotMatchException, caller (`DatabaseLoading::loadTable`) is responsible for correcting // the statement and retry. - if (pks.size() == 1 && !tidb_table_info.columns.empty() && !is_common_handle) + if (pks.size() == 1 && table_info_ && !is_common_handle) { std::vector actual_pri_keys; - for (const auto & col : tidb_table_info.columns) + for (const auto & col : table_info_->get().columns) { if (col.hasPriKeyFlag()) { @@ -239,6 +185,7 @@ void StorageDeltaMerge::updateTableColumnInfo() } // fallover } + // Unknown bug, throw an exception. std::stringstream ss; ss << "["; @@ -261,15 +208,15 @@ void StorageDeltaMerge::updateTableColumnInfo() rowkey_column_defines.push_back(handle_column_define); } rowkey_column_size = rowkey_column_defines.size(); + store = std::make_shared(global_context, data_path_contains_database_name, db_name_, table_name_, + std::move(table_column_defines), std::move(handle_column_define), is_common_handle, rowkey_column_size, + DeltaMergeStore::Settings()); } void StorageDeltaMerge::drop() { shutdown(); - if (store_inited.load(std::memory_order_acquire)) - { - _store->drop(); - } + store->drop(); } Block StorageDeltaMerge::buildInsertBlock(bool is_import, bool is_delete, const Block & old_block) @@ -287,7 +234,6 @@ Block StorageDeltaMerge::buildInsertBlock(bool is_import, bool is_delete, const block.erase(TAG_COLUMN_NAME); } - auto & store = getAndMaybeInitStore(); const size_t rows = block.rows(); if (!block.has(store->getHandle().name)) { @@ -421,12 +367,11 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin auto decorator = [&](const Block & block) { // return this->buildInsertBlock(insert_query.is_import, insert_query.is_delete, block); }; - return std::make_shared(getAndMaybeInitStore(), decorator, global_context, settings); + return std::make_shared(store, decorator, global_context, settings); } void StorageDeltaMerge::write(Block && block, const Settings & settings) { - auto & store = getAndMaybeInitStore(); #ifndef NDEBUG { // Do some check under DEBUG mode to ensure all block are written with column id properly set. @@ -550,7 +495,6 @@ BlockInputStreams StorageDeltaMerge::read( // size_t max_block_size, unsigned num_streams) { - auto & store = getAndMaybeInitStore(); // Note that `columns_to_read` should keep the same sequence as ColumnRef // in `Coprocessor.TableScan.columns`, or rough set filter could be // failed to parsed. @@ -651,7 +595,7 @@ BlockInputStreams StorageDeltaMerge::read( // { /// Query from TiDB / TiSpark auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr { - const ColumnDefines & defines = this->getAndMaybeInitStore()->getTableColumns(); + const ColumnDefines & defines = this->store->getTableColumns(); auto iter = std::find_if( defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); if (iter != defines.end()) @@ -666,7 +610,7 @@ BlockInputStreams StorageDeltaMerge::read( // { // Query from ch client auto create_attr_by_column_id = [this](const String & col_name) -> Attr { - const ColumnDefines & defines = this->getAndMaybeInitStore()->getTableColumns(); + const ColumnDefines & defines = this->store->getTableColumns(); auto iter = std::find_if( defines.begin(), defines.end(), [&col_name](const ColumnDefine & d) -> bool { return d.name == col_name; }); if (iter != defines.end()) @@ -695,7 +639,7 @@ BlockInputStreams StorageDeltaMerge::read( // } } -void StorageDeltaMerge::checkStatus(const Context & context) { getAndMaybeInitStore()->check(context); } +void StorageDeltaMerge::checkStatus(const Context & context) { store->check(context); } void StorageDeltaMerge::flushCache(const Context & context) { @@ -704,16 +648,16 @@ void StorageDeltaMerge::flushCache(const Context & context) void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) { - getAndMaybeInitStore()->flushCache(context, range_to_flush); + store->flushCache(context, range_to_flush); } -void StorageDeltaMerge::mergeDelta(const Context & context) { getAndMaybeInitStore()->mergeDeltaAll(context); } +void StorageDeltaMerge::mergeDelta(const Context & context) { store->mergeDeltaAll(context); } void StorageDeltaMerge::deleteRange(const DM::RowKeyRange & range_to_delete, const Settings & settings) { auto metrics = global_context.getTiFlashMetrics(); GET_METRIC(metrics, tiflash_storage_command_count, type_delete_range).Increment(); - return getAndMaybeInitStore()->deleteRange(global_context, settings, range_to_delete); + return store->deleteRange(global_context, settings, range_to_delete); } void StorageDeltaMerge::ingestFiles( @@ -721,15 +665,12 @@ void StorageDeltaMerge::ingestFiles( { auto metrics = global_context.getTiFlashMetrics(); GET_METRIC(metrics, tiflash_storage_command_count, type_ingest).Increment(); - return getAndMaybeInitStore()->ingestFiles(global_context, settings, range, file_ids, clear_data_in_range); + return store->ingestFiles(global_context, settings, range, file_ids, clear_data_in_range); } UInt64 StorageDeltaMerge::onSyncGc(Int64 limit) { - if (store_inited.load(std::memory_order_acquire)) - { - return _store->onSyncGc(limit); - } + store->onSyncGc(limit); return 0; } @@ -780,7 +721,6 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows) { - auto & store = getAndMaybeInitStore(); size_t total_rows = getRows(store, context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size)); delete_rows = std::min(total_rows, delete_rows); auto delete_range = getRange(store, context, total_rows, delete_rows); @@ -882,6 +822,7 @@ try // update the metadata in database, so that we can read the new schema using TiFlash's client ColumnsDescription new_columns = getColumns(); + for (const auto & command : commands) { if (command.type == AlterCommand::MODIFY_COLUMN) @@ -915,29 +856,19 @@ try } commands.apply(new_columns); // apply AlterCommands to `new_columns` - setColumns(std::move(new_columns)); - if (table_info) - { - tidb_table_info = table_info.value(); - } - - if (store_inited.load(std::memory_order_acquire)) - { - _store->applyAlters(commands, table_info, max_column_id_used, context); - } - else - { - updateTableColumnInfo(); - } + // apply alter to store's table column in DeltaMergeStore + store->applyAlters(commands, table_info, max_column_id_used, context); - SortDescription pk_desc = getPrimarySortDescription(); - ColumnDefines store_columns = getStoreColumnDefines(); + SortDescription pk_desc = store->getPrimarySortDescription(); TiDB::TableInfo table_info_from_store; table_info_from_store.name = table_name_; // after update `new_columns` and store's table columns, we need to update create table statement, // so that we can restore table next time. - updateDeltaMergeTableCreateStatement(database_name, table_name_, pk_desc, getColumns(), hidden_columns, - getTableInfoForCreateStatement(table_info, table_info_from_store, store_columns, hidden_columns), tombstone, context); + updateDeltaMergeTableCreateStatement(database_name, table_name_, pk_desc, new_columns, hidden_columns, + getTableInfoForCreateStatement(table_info, table_info_from_store, store->getTableColumns(), hidden_columns), tombstone, context); + setColumns(std::move(new_columns)); + if (table_info) + tidb_table_info = table_info.value(); setTombstone(tombstone); } catch (Exception & e) @@ -951,26 +882,6 @@ catch (Exception & e) throw; } -ColumnDefines StorageDeltaMerge::getStoreColumnDefines() const -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store->getTableColumns(); - } - ColumnDefines cols; - cols.emplace_back(table_column_info->handle_column_define); - cols.emplace_back(getVersionColumnDefine()); - cols.emplace_back(getTagColumnDefine()); - for (const auto & col : table_column_info->table_column_defines) - { - if (col.id != table_column_info->handle_column_define.id && col.id != VERSION_COLUMN_ID && col.id != TAG_COLUMN_ID) - { - cols.emplace_back(col); - } - } - return cols; -} - String StorageDeltaMerge::getName() const { return MutableSupport::delta_tree_storage_name; } void StorageDeltaMerge::rename( @@ -979,23 +890,13 @@ void StorageDeltaMerge::rename( tidb_table_info.name = new_display_table_name; // update name in table info // For DatabaseTiFlash, simply update store's database is OK. // `store->getTableName() == new_table_name` only keep for mock test. - bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; + bool clean_rename = !data_path_contains_database_name && store->getTableName() == new_table_name; if (likely(clean_rename)) { - if (store_inited.load(std::memory_order_acquire)) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - } - else - { - table_column_info->db_name = new_database_name; - table_column_info->table_name = new_table_name; - } + store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); return; } - /// Note that this routine is only left for CI tests. `clean_rename` should always be true in production env. - auto & store = getAndMaybeInitStore(); // For DatabaseOrdinary, we need to rename data path, then recreate a new store. const String new_path = new_path_to_db + "/" + new_table_name; @@ -1014,29 +915,18 @@ void StorageDeltaMerge::rename( store->shutdown(); // rename directories for multi disks store->rename(new_path, clean_rename, new_database_name, new_table_name); + + store = {}; // reset store object + // generate a new store store = std::make_shared(global_context, // data_path_contains_database_name, new_database_name, new_table_name, // std::move(table_column_defines), std::move(handle_column_define), is_common_handle, rowkey_column_size, settings); } -String StorageDeltaMerge::getTableName() const -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store->getTableName(); - } - return table_column_info->table_name; -} +String StorageDeltaMerge::getTableName() const { return store->getTableName(); } -String StorageDeltaMerge::getDatabaseName() const -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store->getDatabaseName(); - } - return table_column_info->db_name; -} +String StorageDeltaMerge::getDatabaseName() const { return store->getDatabaseName(); } void updateDeltaMergeTableCreateStatement( // const String & database_name, const String & table_name, // @@ -1129,7 +1019,7 @@ void StorageDeltaMerge::modifyASTStorage(ASTStorage * storage_ast, const TiDB::T args->children.at(1) = literal; else throw Exception( - "Wrong arguments num: " + DB::toString(args->children.size()) + " in table: " + this->getTableName() + " in modifyASTStorage", + "Wrong arguments num: " + DB::toString(args->children.size()) + " in table: " + store->getTableName() + " in modifyASTStorage", ErrorCodes::BAD_ARGUMENTS); } @@ -1144,11 +1034,7 @@ BlockInputStreamPtr StorageDeltaMerge::status() auto & name_col = columns[0]; auto & value_col = columns[1]; - DeltaMergeStoreStat stat; - if (store_inited.load(std::memory_order_acquire)) - { - stat = _store->getStat(); - } + DeltaMergeStoreStat stat = store->getStat(); #define INSERT_INT(NAME) \ name_col->insert(String(#NAME)); \ @@ -1242,10 +1128,8 @@ void StorageDeltaMerge::shutdown() bool v = false; if (!shutdown_called.compare_exchange_strong(v, true)) return; - if (store_inited.load(std::memory_order_acquire)) - { - _store->shutdown(); - } + + store->shutdown(); } void StorageDeltaMerge::removeFromTMTContext() @@ -1258,88 +1142,8 @@ void StorageDeltaMerge::removeFromTMTContext() StorageDeltaMerge::~StorageDeltaMerge() { shutdown(); } -DataTypePtr StorageDeltaMerge::getPKTypeImpl() const -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store->getPKDataType(); - } - return table_column_info->handle_column_define.type; -} +DataTypePtr StorageDeltaMerge::getPKTypeImpl() const { return store->getPKDataType(); } -SortDescription StorageDeltaMerge::getPrimarySortDescription() const -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store->getPrimarySortDescription(); - } - - SortDescription desc; - desc.emplace_back(table_column_info->handle_column_define.name, /* direction_= */ 1, /* nulls_direction_= */ 1); - return desc; -} - -DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore() -{ - if (store_inited.load(std::memory_order_acquire)) - { - return _store; - } - std::lock_guard lock(store_mutex); - if (_store == nullptr) - { - _store = std::make_shared(global_context, data_path_contains_database_name, table_column_info->db_name, - table_column_info->table_name, std::move(table_column_info->table_column_defines), - std::move(table_column_info->handle_column_define), is_common_handle, rowkey_column_size, DeltaMergeStore::Settings()); - table_column_info.reset(nullptr); - store_inited.store(true, std::memory_order_release); - } - return _store; -} - -bool StorageDeltaMerge::initStoreIfDataDirExist() -{ - if (shutdown_called.load(std::memory_order_relaxed) || isTombstone()) - { - return false; - } - // If store is inited, we don't need to check data dir. - if (store_inited.load(std::memory_order_relaxed)) - { - return true; - } - if (!dataDirExist()) - { - return false; - } - getAndMaybeInitStore(); - return true; -} - -bool StorageDeltaMerge::dataDirExist() -{ - String db_name, table_name; - { - std::lock_guard lock(store_mutex); - // store is inited after lock acquired. - if (store_inited.load(std::memory_order_acquire)) - { - return true; - } - db_name = table_column_info->db_name; - table_name = table_column_info->table_name; - } +SortDescription StorageDeltaMerge::getPrimarySortDescription() const { return store->getPrimarySortDescription(); } - auto path_pool = global_context.getPathPool().withTable(db_name, table_name, data_path_contains_database_name); - auto path_delegate = path_pool.getStableDiskDelegator(); - for (const auto & root_path : path_delegate.listPaths()) - { - int r = ::access(root_path.c_str(), F_OK); - if (r == 0) - { - return true; - } - } - return false; -} } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 006e73e6b7e..ce3a1e72635 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -104,16 +104,12 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ void checkStatus(const Context & context) override; void deleteRows(const Context &, size_t rows) override; - const DM::DeltaMergeStorePtr & getStore() - { - return getAndMaybeInitStore(); - } + const DM::DeltaMergeStorePtr & getStore() { return store; } bool isCommonHandle() const override { return is_common_handle; } size_t getRowKeyColumnSize() const override { return rowkey_column_size; } - - bool initStoreIfDataDirExist() override; + protected: StorageDeltaMerge( // @@ -137,30 +133,12 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ DataTypePtr getPKTypeImpl() const override; - DM::DeltaMergeStorePtr& getAndMaybeInitStore(); - bool storeInited() const { return store_inited.load(); } - void updateTableColumnInfo(); - DM::ColumnDefines getStoreColumnDefines() const; - - bool dataDirExist(); private: - struct TableColumnInfo - { - TableColumnInfo(const String& db, const String& table, const ASTPtr& pk) - : db_name(db), table_name(table), pk_expr_ast(pk) {} - String db_name; - String table_name; - ASTPtr pk_expr_ast; - DM::ColumnDefines table_column_defines; - DM::ColumnDefine handle_column_define; - }; + using ColumnIdMap = std::unordered_map; + const bool data_path_contains_database_name = false; - std::mutex store_mutex; - - std::unique_ptr table_column_info; // After create DeltaMergeStore object, it is deprecated. - std::atomic store_inited; - DM::DeltaMergeStorePtr _store; + DM::DeltaMergeStorePtr store; Strings pk_column_names; // TODO: remove it. Only use for debug from ch-client. bool is_common_handle; @@ -179,6 +157,7 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ std::atomic next_version = 1; //TODO: remove this!!! Context & global_context; + Logger * log; };