Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema: allow loading empty schema diff when the version grows up. #5245

Merged
merged 15 commits into from
Jul 4, 2022
8 changes: 6 additions & 2 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
#include <Debug/MockTiDB.h>
#include <TiDB/Schema/SchemaGetter.h>

#include <optional>

namespace DB
{

struct MockSchemaGetter
{
TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }

Int64 getVersion() { return MockTiDB::instance().getVersion(); }

SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); }
std::optional<SchemaDiff> getSchemaDiff(Int64 version)
{
return MockTiDB::instance().getSchemaDiff(version);
}

TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ std::pair<bool, DatabaseID> MockTiDB::getDBIDByName(const String & database_name
return std::make_pair(false, -1);
}

SchemaDiff MockTiDB::getSchemaDiff(Int64 version_)
std::optional<SchemaDiff> MockTiDB::getSchemaDiff(Int64 version_)
{
return version_diff[version_];
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class MockTiDB : public ext::Singleton<MockTiDB>

std::pair<bool, DatabaseID> getDBIDByName(const String & database_name);

SchemaDiff getSchemaDiff(Int64 version);
std::optional<SchemaDiff> getSchemaDiff(Int64 version);

std::unordered_map<String, DatabaseID> getDatabases() { return databases; }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/ReadIndexWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ BatchReadIndexRes ReadIndexWorkerManager::batchReadIndex(
}
}
{ // if meet timeout, which means part of regions can not get response from leader, try to poll rest tasks
TEST_LOG_FMT("rest {}, poll rest tasks onece", tasks.size());
TEST_LOG_FMT("rest {}, poll rest tasks once", tasks.size());

while (!tasks.empty())
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

namespace DB
{

namespace ErrorCodes
{
extern const int SCHEMA_SYNC_ERROR;
Expand Down Expand Up @@ -193,13 +192,14 @@ String SchemaGetter::getSchemaDiffKey(Int64 ver)
return std::string(schemaDiffPrefix) + ":" + std::to_string(ver);
}

SchemaDiff SchemaGetter::getSchemaDiff(Int64 ver)
std::optional<SchemaDiff> SchemaGetter::getSchemaDiff(Int64 ver)
{
String key = getSchemaDiffKey(ver);
String data = TxnStructure::get(snap, key);
if (data.empty())
{
throw TiFlashException("cannot find schema diff for version: " + std::to_string(ver), Errors::Table::SyncError);
LOG_FMT_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key);
return std::nullopt;
}
SchemaDiff diff;
diff.deserialize(data);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/TiDB/Schema/SchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <common/logger_useful.h>

#include <optional>

namespace DB
{
// The enum results are completely the same as the DDL Action listed in the "parser/model/ddl.go" of TiDB codebase, which must be keeping in sync.
Expand Down Expand Up @@ -138,7 +140,7 @@ struct SchemaGetter

Int64 getVersion();

SchemaDiff getSchemaDiff(Int64 ver);
std::optional<SchemaDiff> getSchemaDiff(Int64 ver);

static String getSchemaDiffKey(Int64 ver);

Expand Down
101 changes: 60 additions & 41 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ struct TiDBSchemaSyncer : public SchemaSyncer
SCOPE_EXIT({ GET_METRIC(tiflash_schema_applying).Set(0.0); });

GET_METRIC(tiflash_schema_apply_count, type_diff).Increment();
if (!tryLoadSchemaDiffs(getter, version, context))
Int64 version_after_load_diff = 0;
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1)
{
GET_METRIC(tiflash_schema_apply_count, type_full).Increment();
loadAllSchema(getter, version, context);
}
cur_version = version;
cur_version = version_after_load_diff;
GET_METRIC(tiflash_schema_version).Set(cur_version);
LOG_FMT_INFO(log, "end sync schema, version has been updated to {}", cur_version);
return true;
Expand All @@ -144,64 +145,82 @@ struct TiDBSchemaSyncer : public SchemaSyncer
return it->second;
}

bool tryLoadSchemaDiffs(Getter & getter, Int64 version, Context & context)
Int64 tryLoadSchemaDiffs(Getter & getter, Int64 lastest_version, Context & context)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
if (isTooOldSchema(cur_version, version))
if (isTooOldSchema(cur_version, lastest_version))
{
return false;
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
}

LOG_FMT_DEBUG(log, "try load schema diffs.");

SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, version);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, lastest_version);

Int64 used_version = cur_version;
std::vector<SchemaDiff> diffs;
while (used_version < version)
while (used_version < lastest_version)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
used_version++;
diffs.push_back(getter.getSchemaDiff(used_version));
}
LOG_FMT_DEBUG(log, "end load schema diffs with total {} entries.", diffs.size());
try
{
for (const auto & diff : diffs)
auto schema_diff = getter.getSchemaDiff(used_version);
if (!schema_diff)
{
builder.applyDiff(diff);
// If `schema diff` from `lastest_version` got empty `schema diff`
// Then we won't apply to `lastest_version`, but we will apply to `lastest_version - 1`
// If `schema diff` from [`cur_version`, `lastest_version - 1`] got empty `schema diff`
// Then we should just skip it.
//
// example:
// - `cur_version` is 1, `lastest_version` is 10
// - The schema diff of schema version [2,4,6] is empty, Then we just skip it.
// - The schema diff of schema version 10 is empty, Then we should just apply version into 9
if (used_version != lastest_version)
{
LOG_FMT_WARNING(log, "Skip the schema diff from version {}. ", used_version);
continue;
} // else (used_version == lastest_version)

LOG_FMT_DEBUG(log, "End load a part of schema diffs, current version is {} ", used_version);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return used_version - 1;
}
}
catch (TiFlashException & e)
{
if (!e.getError().is(Errors::DDL::StaleSchema))

try
{
builder.applyDiff(*schema_diff);
}
catch (TiFlashException & e)
{
if (!e.getError().is(Errors::DDL::StaleSchema))
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
}
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return -1;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
throw;
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return -1;
}
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
catch (Poco::Exception & e)
{
throw;
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText());
return -1;
}
catch (std::exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what());
return -1;
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
}
catch (Poco::Exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText());
return false;
}
catch (std::exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what());
return false;
}
return true;

LOG_FMT_DEBUG(log, "End load all of schema diffs, current version is {} ", used_version);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return used_version;
}

void loadAllSchema(Getter & getter, Int64 version, Context & context)
Expand Down