Skip to content

Commit

Permalink
Merge branch 'master' into fix-not-gc-data
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Apr 20, 2022
2 parents 11a9759 + 636fcd2 commit 2c71faa
Show file tree
Hide file tree
Showing 68 changed files with 2,466 additions and 358 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ target_link_libraries (dbms
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
)

if (NOT USE_INTERNAL_RE2_LIBRARY)
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Common/CPUAffinityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <common/defines.h>

#include <string>
#include <thread>
#include <unordered_map>
Expand Down Expand Up @@ -115,8 +117,9 @@ class CPUAffinityManager
#endif

// unused except Linux
[[maybe_unused]] int query_cpu_percent;
[[maybe_unused]] int cpu_cores;
MAYBE_UNUSED_MEMBER int query_cpu_percent;
MAYBE_UNUSED_MEMBER int cpu_cores;

std::vector<std::string> query_threads;
Poco::Logger * log;

Expand Down
34 changes: 19 additions & 15 deletions dbms/src/Common/DynamicThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,30 @@ void DynamicThreadPool::fixedWork(size_t index)

void DynamicThreadPool::dynamicWork(TaskPtr initial_task)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_thdpool, type_max_threads_of_thdpool);
executeTask(initial_task);

DynamicNode node;
while (true)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_thdpool, type_max_threads_of_thdpool);
executeTask(initial_task);

DynamicNode node;
while (true)
{
std::unique_lock lock(dynamic_mutex);
if (in_destructing)
{
std::unique_lock lock(dynamic_mutex);
if (in_destructing)
break;
// attach to just after head to reuse hot threads so that cold threads have chance to exit
node.appendTo(&dynamic_idle_head);
node.cv.wait_for(lock, dynamic_auto_shrink_cooldown);
node.detach();
}

if (!node.task) // may be timeout or cancelled
break;
// attach to just after head to reuse hot threads so that cold threads have chance to exit
node.appendTo(&dynamic_idle_head);
node.cv.wait_for(lock, dynamic_auto_shrink_cooldown);
node.detach();
executeTask(node.task);
}

if (!node.task) // may be timeout or cancelled
break;
executeTask(node.task);
}
// must decrease counter after scope of `UPDATE_CUR_AND_MAX_METRIC`
// to avoid potential data race (#4595)
alive_dynamic_threads.fetch_sub(1);
}

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_set_dtfile_exist_when_acquire_id) \
M(force_no_local_region_for_mpp_task) \
M(force_remote_read_for_batch_cop) \
M(force_context_path)
M(force_context_path) \
M(force_slow_page_storage_snapshot_release)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ void submitLocalDeltaMemory()
local_delta = 0;
}

Int64 getLocalDeltaMemory()
{
return local_delta;
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ namespace CurrentMemoryTracker
{
void disableThreshold();
void submitLocalDeltaMemory();
Int64 getLocalDeltaMemory();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/MemoryTracker.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
Expand Down Expand Up @@ -270,6 +271,7 @@ void ParallelAggregatingBlockInputStream::execute()
file_provider,
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory,
no_more_keys);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
120 changes: 116 additions & 4 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DatabaseTiFlashTest : public ::testing::Test
}
}

void recreateMetadataPath() const
static void recreateMetadataPath()
{
String path = TiFlashTestEnv::getContext().getPath();

Expand Down Expand Up @@ -652,6 +652,118 @@ try
}
CATCH

TEST_F(DatabaseTiFlashTest, ISSUE4596)
try
{
const String db_name = "db_1";
auto ctx = TiFlashTestEnv::getContext();

{
// Create database
const String statement = "CREATE DATABASE " + db_name + " ENGINE=TiFlash";
ASTPtr ast = parseCreateStatement(statement);
InterpreterCreateQuery interpreter(ast, ctx);
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
}

auto db = ctx.getDatabase(db_name);

const String tbl_name = "t_111";
{
/// Create table
ParserCreateQuery parser;
const String stmt = fmt::format("CREATE TABLE `{}`.`{}` ", db_name, tbl_name) +
R"stmt(
(`id` Int32,`b` String) Engine = DeltaMerge((`id`),
'{
"cols":[{
"comment":"",
"default":null,
"default_bit":null,
"id":1,
"name":{
"L":"id",
"O":"id"
},
"offset":0,
"origin_default":null,
"state":5,
"type":{
"Charset":"binary",
"Collate":"binary",
"Decimal":0,
"Elems":null,
"Flag":515,
"Flen":16,
"Tp":3
}
},
{
"comment":"",
"default":"",
"default_bit":null,
"id":15,
"name":{
"L":"b",
"O":"b"
},
"offset":12,
"origin_default":"",
"state":5,
"type":{
"Charset":"binary",
"Collate":"binary",
"Decimal":0,
"Elems":null,
"Flag":4225,
"Flen":-1,
"Tp":251
}
}],
"comment":"",
"id":330,
"index_info":[],
"is_common_handle":false,
"name":{
"L":"test",
"O":"test"
},
"partition":null,
"pk_is_handle":true,
"schema_version":465,
"state":5,
"update_timestamp":99999
}'
)
)stmt";
ASTPtr ast = parseQuery(parser, stmt, 0);

InterpreterCreateQuery interpreter(ast, ctx);
interpreter.setInternal(true);
interpreter.setForceRestoreData(false);
interpreter.execute();
}

EXPECT_FALSE(db->empty(ctx));
EXPECT_TRUE(db->isTableExist(ctx, tbl_name));

{
// Get storage from database
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
EXPECT_EQ(managed_storage->getTableInfo().name, "test");
}
}
CATCH

TEST_F(DatabaseTiFlashTest, ISSUE1055)
try
{
Expand Down Expand Up @@ -688,7 +800,7 @@ try
DatabaseLoading::loadTable(ctx, *db, meta_path, db_name, db_data_path, "TiFlash", "t_45.sql", false);

// Get storage from database
const auto tbl_name = "t_45";
const auto * tbl_name = "t_45";
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
Expand Down Expand Up @@ -776,7 +888,7 @@ try
auto db = ctx.getDatabase(name_mapper.mapDatabaseName(*db_info));
ASSERT_NE(db, nullptr);
EXPECT_EQ(db->getEngineName(), "TiFlash");
auto flash_db = typeid_cast<DatabaseTiFlash *>(db.get());
auto * flash_db = typeid_cast<DatabaseTiFlash *>(db.get());
auto & db_info_get = flash_db->getDatabaseInfo();
ASSERT_EQ(db_info_get.name, expect_name);
}
Expand Down Expand Up @@ -841,7 +953,7 @@ try
)",
};

for (auto & statement : statements)
for (const auto & statement : statements)
{
{
// Cleanup: Drop database if exists
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
#include <Poco/StringTokenizer.h>
#include <common/logger_useful.h>

#include <cstddef>
#include <memory>

namespace DB
{
namespace
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ struct MPPInfo
Timestamp start_ts;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> & sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map;
MPPInfo(Timestamp start_ts_, Int64 partition_id_, Int64 task_id_, const std::vector<Int64> & sender_target_task_ids_, const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, partition_id(partition_id_)
, task_id(task_id_)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector<pid_t> thread_ids)
{
std::lock_guard lock(bg_thread_ids_mtx);
bg_thread_ids.swap(thread_ids);
LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids);
}

std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]])
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -33,6 +35,24 @@ bool strictSqlMode(UInt64 sql_mode)
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
}

void DAGContext::initOutputInfo()
{
output_field_types = collectOutputFieldTypes(*dag_request);
output_offsets.clear();
result_field_types.clear();
for (UInt32 i : dag_request->output_offsets())
{
output_offsets.push_back(i);
if (unlikely(i >= output_field_types.size()))
throw TiFlashException(
fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, output_field_types.size(), i),
Errors::Coprocessor::BadRequest);
result_field_types.push_back(output_field_types[i]);
}
encode_type = analyzeDAGEncodeType(*this);
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand All @@ -43,6 +63,13 @@ bool DAGContext::allowInvalidDate() const
return sql_mode & TiDBSQLMode::ALLOW_INVALID_DATES;
}

void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subquery)
{
SubqueriesForSets subqueries_for_sets;
subqueries_for_sets[subquery_id] = std::move(subquery);
subqueries.push_back(std::move(subqueries_for_sets));
}

std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap()
{
return profile_streams_map;
Expand Down
Loading

0 comments on commit 2c71faa

Please sign in to comment.