Skip to content

Commit

Permalink
support keyspace feature (#6816)
Browse files Browse the repository at this point in the history
close #6815
  • Loading branch information
iosmanthus authored Mar 12, 2023
1 parent 868de2c commit 835ad98
Show file tree
Hide file tree
Showing 77 changed files with 849 additions and 328 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/RedactHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void Redact::setRedactLog(bool v)
Redact::REDACT_LOG.store(v, std::memory_order_relaxed);
}

std::string Redact::handleToDebugString(DB::HandleID handle)
std::string Redact::handleToDebugString(int64_t handle)
{
if (Redact::REDACT_LOG.load(std::memory_order_relaxed))
return "?";
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Common/RedactHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <Storages/Transaction/Types.h>

#include <atomic>
#include <ostream>

Expand All @@ -29,7 +27,7 @@ class Redact
public:
static void setRedactLog(bool v);

static std::string handleToDebugString(DB::HandleID handle);
static std::string handleToDebugString(int64_t handle);
static std::string keyToDebugString(const char * key, size_t size);

static std::string keyToHexString(const char * key, size_t size);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Common/tests/gtest_redact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ namespace DB
{
namespace tests
{
TEST(RedactLog_test, Basic)
TEST(RedactLogTest, Basic)
{
const char * test_key = "\x01\x0a\xff";
const size_t key_sz = strlen(test_key);

const DB::HandleID test_handle = 10009;
const /*DB::HandleID*/ Int64 test_handle = 10009;

Redact::setRedactLog(false);
EXPECT_EQ(Redact::keyToDebugString(test_key, key_sz), "010AFF");
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,14 +854,14 @@ try

for (const auto & [expect_name, json_str] : cases)
{
TiDB::DBInfoPtr db_info = std::make_shared<TiDB::DBInfo>(json_str);
TiDB::DBInfoPtr db_info = std::make_shared<TiDB::DBInfo>(json_str, NullspaceID);
ASSERT_NE(db_info, nullptr);
ASSERT_EQ(db_info->name, expect_name);

const auto seri = db_info->serialize();

{
auto deseri = std::make_shared<TiDB::DBInfo>(seri);
auto deseri = std::make_shared<TiDB::DBInfo>(seri, NullspaceID);
ASSERT_NE(deseri, nullptr);
ASSERT_EQ(deseri->name, expect_name);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ TableID MockRaftStoreProxy::bootstrap_table(
UInt64 table_id = MockTiDB::instance().newTable("d", "t" + toString(random()), columns, tso, "", "dt");

auto schema_syncer = tmt.getSchemaSyncer();
schema_syncer->syncSchemas(ctx);
schema_syncer->syncSchemas(ctx, NullspaceID);
this->table_id = table_id;
return table_id;
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct MockSchemaGetter
}
return res;
}

KeyspaceID getKeyspaceID() const { return NullspaceID; }
};

} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_
tables_by_id.erase(partition.id);
if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(partition.id))
for (auto & e : region_table.getRegionsByTable(NullspaceID, partition.id))
kvstore->mockRemoveRegion(e.first, region_table);
region_table.removeTable(partition.id);
region_table.removeTable(NullspaceID, partition.id);
}
}
}
Expand All @@ -94,9 +94,9 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(table->id()))
for (auto & e : region_table.getRegionsByTable(NullspaceID, table->id()))
kvstore->mockRemoveRegion(e.first, region_table);
region_table.removeTable(table->id());
region_table.removeTable(NullspaceID, table->id());
}

return table;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ static GlobalRegionMap GLOBAL_REGION_MAP;
/// Pre-decode region data into block cache and remove committed data from `region`
RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & region, Context & context)
{
auto keyspace_id = region->getKeyspaceID();
const auto & tmt = context.getTMTContext();
{
Timestamp gc_safe_point = 0;
Expand Down Expand Up @@ -556,7 +557,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio

const auto atomic_decode = [&](bool force_decode) -> bool {
Stopwatch watch;
auto storage = tmt.getStorages().get(table_id);
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr || storage->isTombstone())
{
if (!force_decode) // Need to update.
Expand Down Expand Up @@ -594,7 +595,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio

if (!atomic_decode(false))
{
tmt.getSchemaSyncer()->syncSchemas(context);
tmt.getSchemaSyncer()->syncSchemas(context, keyspace_id);

if (!atomic_decode(true))
throw Exception("Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
auto schema_syncer = tmt.getSchemaSyncer();
try
{
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
}
catch (Exception & e)
{
Expand Down Expand Up @@ -95,7 +95,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
service->gc(gc_safe_point);
service->gc(gc_safe_point, NullspaceID);

output("schemas gc done");
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncSchemaName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs
for (const auto & part_def : table_info.partition.definitions)
{
auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper);
auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id);
auto partition_storage = context.getTMTContext().getStorages().get(NullspaceID, paritition_table_info->id);
fmt_buf.append((std::to_string(partition_storage->getTableInfo().replica_info.count)));
fmt_buf.append("/");
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgNaturalDag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void NaturalDag::loadTables(const NaturalDag::JSONObjectPtr & obj)
table.id = id;
auto tbl_json = td_json->getObject(std::to_string(id));
auto meta_json = tbl_json->getObject(TABLE_META);
table.meta = TiDB::TableInfo(meta_json);
table.meta = TiDB::TableInfo(meta_json, NullspaceID);
auto regions_json = tbl_json->getArray(TABLE_REGIONS);
for (const auto & region_json : *regions_json)
{
Expand Down Expand Up @@ -208,7 +208,7 @@ void NaturalDag::buildTables(Context & context)
auto & table = it.second;
auto meta = table.meta;
MockTiDB::instance().addTable(db_name, std::move(meta));
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
for (auto & region : table.regions)
{
metapb::Region region_pb;
Expand Down Expand Up @@ -243,7 +243,7 @@ void NaturalDag::buildDatabase(Context & context, SchemaSyncerPtr & schema_synce
MockTiDB::instance().dropDB(context, db_name, true);
}
MockTiDB::instance().newDataBase(db_name);
schema_syncer->syncSchemas(context);
schema_syncer->syncSchemas(context, NullspaceID);
}

void NaturalDag::build(Context & context)
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
for (const auto & partition : table_info->partition.definitions)
{
const auto partition_id = partition.id;
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(partition_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, partition_id);
for (size_t i = 0; i < regions.size(); ++i)
{
if ((current_region_size + i) % properties.mpp_partition_num != static_cast<size_t>(task.partition_id))
Expand All @@ -204,7 +204,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
}
else
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id);
if (regions.size() < static_cast<size_t>(properties.mpp_partition_num))
throw Exception("Not supported: table region num less than mpp partition num");
for (size_t i = 0; i < regions.size(); ++i)
Expand All @@ -231,7 +231,7 @@ BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, co
RegionPtr region;
if (region_id == InvalidRegionID)
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(NullspaceID, table_id);
if (regions.empty())
throw Exception("No region for table", ErrorCodes::BAD_ARGUMENTS);
region = regions[0].second;
Expand Down Expand Up @@ -309,7 +309,7 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest

table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log);
context.setDAGContext(&dag_context);

DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -337,7 +337,7 @@ bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Re
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", false, log);
context.setDAGContext(&dag_context);
DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
driver.execute();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ Int64 concurrentRangeOperate(

{
TMTContext & tmt = context.getTMTContext();
for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(table_info.id))
for (auto && [_, r] : tmt.getRegionTable().getRegionsByTable(NullspaceID, table_info.id))
{
std::ignore = _;
if (r == nullptr)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/ServiceUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
Expand Down Expand Up @@ -74,6 +75,7 @@ grpc::Status BatchCoprocessorHandler::execute()
DAGContext dag_context(
dag_request,
std::move(tables_regions_info),
RequestUtils::deriveKeyspaceID(cop_request->context()),
cop_context.db_context.getClientInfo().current_address.toString(),
/*is_batch_cop=*/true,
Logger::get("BatchCoprocessorHandler"));
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/traverseExecutors.h>
Expand All @@ -36,7 +37,7 @@ bool strictSqlMode(UInt64 sql_mode)
}

// for non-mpp(cop/batchCop)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
Expand All @@ -52,6 +53,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
Expand Down Expand Up @@ -79,6 +81,7 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, KeyspaceID keyspace_id_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);
Expand Down Expand Up @@ -272,6 +272,7 @@ class DAGContext

void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getRootExecutorId();

const tipb::DAGRequest * dag_request;
Expand Down Expand Up @@ -362,6 +363,9 @@ class DAGContext
// In disaggregated tiflash mode, table_scan in tiflash_compute node will be converted ExchangeReceiver.
// Record here so we can add to receiver_set and cancel/close it.
std::optional<std::pair<String, ExchangeReceiverPtr>> disaggregated_compute_exchange_receiver;

// The keyspace that the DAG request from
const KeyspaceID keyspace_id = NullspaceID;
};

} // namespace DB
13 changes: 7 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ std::vector<pingcap::coprocessor::CopTask> DAGStorageInterpreter::buildCopTasks(
std::multimap<std::string, std::string> meta_data;
meta_data.emplace("is_remote_read", "true");

auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] {
auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, dagContext().getKeyspaceID(), &Poco::Logger::get("pingcap/coprocessor"), std::move(meta_data), [&] {
GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment();
});
all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end());
Expand Down Expand Up @@ -838,10 +838,11 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max

std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version)
{
auto keyspace_id = context.getDAGContext()->getKeyspaceID();
std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> storages_with_lock;
if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION))
{
auto logical_table_storage = tmt.getStorages().get(logical_table_id);
auto logical_table_storage = tmt.getStorages().get(keyspace_id, logical_table_id);
if (!logical_table_storage)
{
throw TiFlashException(fmt::format("Table {} doesn't exist.", logical_table_id), Errors::Table::NotExists);
Expand All @@ -851,7 +852,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
{
for (auto const physical_table_id : table_scan.getPhysicalTableIDs())
{
auto physical_table_storage = tmt.getStorages().get(physical_table_id);
auto physical_table_storage = tmt.getStorages().get(keyspace_id, physical_table_id);
if (!physical_table_storage)
{
throw TiFlashException(fmt::format("Table {} doesn't exist.", physical_table_id), Errors::Table::NotExists);
Expand All @@ -862,14 +863,14 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion();
auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(keyspace_id);

/// Align schema version under the read lock.
/// Return: [storage, table_structure_lock, storage_schema_version, ok]
auto get_and_lock_storage = [&](bool schema_synced, TableID table_id) -> std::tuple<ManageableStoragePtr, TableStructureLockHolder, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto table_store = tmt.getStorages().get(table_id);
auto table_store = tmt.getStorages().get(keyspace_id, table_id);
if (!table_store)
{
if (schema_synced)
Expand Down Expand Up @@ -964,7 +965,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto sync_schema = [&] {
auto start_time = Clock::now();
GET_METRIC(tiflash_schema_trigger_count, type_cop_read).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);
tmt.getSchemaSyncer()->syncSchemas(context, dagContext().getKeyspaceID());
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
LOG_INFO(log, "Table {} schema sync cost {}ms.", logical_table_id, schema_sync_cost);
};
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/RequestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,10 @@ setUpRegionInfos(const pingcap::coprocessor::BatchCopTask & batch_cop_task, cons
return region_ids;
}

template <typename Context>
pingcap::pd::KeyspaceID deriveKeyspaceID(const Context & ctx)
{
return ctx.api_version() == kvrpcpb::APIVersion::V1 ? pingcap::pd::NullspaceID : static_cast<pingcap::pd::KeyspaceID>(ctx.keyspace_id());
}

} // namespace DB::RequestUtils
Loading

0 comments on commit 835ad98

Please sign in to comment.