Skip to content

Commit

Permalink
PageStorage: Fix BlobStore read with FieldReadInfos (#4181)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
JaySon-Huang authored Mar 7, 2022
1 parent 5229725 commit 7c1a78e
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 98 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/Exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
{
try
{
LOG_ERROR(logger, start_of_message << (start_of_message.empty() ? "" : ": ") << getCurrentExceptionMessage(true));
LOG_FMT_ERROR(logger, "{}{}{}", start_of_message, (start_of_message.empty() ? "" : ": "), getCurrentExceptionMessage(true));
}
catch (...)
{
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/tests/stress/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
include_directories (${CMAKE_CURRENT_BINARY_DIR})

add_executable(dm_stress main.cpp DMStressProxy.cpp ${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp)
add_executable(dm_stress
main.cpp
DMStressProxy.cpp
${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestEnv.cpp
)
target_link_libraries(dm_stress dbms gtest clickhouse_functions clickhouse-server-lib)
133 changes: 91 additions & 42 deletions dbms/src/Storages/DeltaMerge/tests/stress/DMStressProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,42 @@ void DMStressProxy::genMultiThread()

void DMStressProxy::genData(UInt32 id, UInt64 rows)
{
std::string thread_name = "dm_gen_" + std::to_string(id);
setThreadName(thread_name.c_str());
UInt64 generated_count = 0;
while (generated_count < rows)
try
{
auto c = std::min(opts.gen_rows_per_block, rows - generated_count);
auto ids = pk.get(c);
write(ids);
generated_count += c;
std::string thread_name = "dm_gen_" + std::to_string(id);
setThreadName(thread_name.c_str());
UInt64 generated_count = 0;
while (generated_count < rows)
{
auto c = std::min(opts.gen_rows_per_block, rows - generated_count);
auto ids = pk.get(c);
write(ids);
generated_count += c;
}
}
catch (...)
{
DB::tryLogCurrentException("genData fail");
}
}

void DMStressProxy::write(const std::vector<Int64> & ids)
{
Block block;
genBlock(block, ids);
try
{
Block block;
genBlock(block, ids);

auto locks = key_lock.getLocks(ids);
store->write(*context, context->getSettingsRef(), std::move(block));
if (opts.verify)
auto locks = key_lock.getLocks(ids);
store->write(*context, context->getSettingsRef(), block);
if (opts.verify)
{
pks.insert(ids);
}
}
catch (...)
{
pks.insert(ids);
DB::tryLogCurrentException("runProxy fail");
}
}

Expand All @@ -132,19 +146,26 @@ void DMStressProxy::genBlock(Block & block, const std::vector<Int64> & ids)
insertColumn<UInt64>(block, TAG_COLUMN_TYPE, TAG_COLUMN_NAME, TAG_COLUMN_ID, v_tag);
std::vector<UInt64> v_balance(ids.size(), 1024);
insertColumn<UInt64>(block, col_balance_define.type, col_balance_define.name, col_balance_define.id, v_balance);
std::string s("C", 128);
std::string s(128, 'C');
std::vector<String> v_s(ids.size(), s);
insertColumn<String>(block, col_random_define.type, col_random_define.name, col_random_define.id, v_s);
}

void DMStressProxy::readMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_read_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
std::string thread_name = "dm_read_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
countRows(rnd() % 100);
}
}
catch (...)
{
countRows(rnd() % 100);
DB::tryLogCurrentException("readMultiThread fail");
}
};

Expand Down Expand Up @@ -188,16 +209,23 @@ UInt64 DMStressProxy::countRows(UInt32 rnd_break_prob)
void DMStressProxy::insertMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_insert_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
insert();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_insert_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
insert();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("insertMultiThread fail");
}
};

insert_threads.reserve(opts.insert_concurrency);
Expand All @@ -210,16 +238,23 @@ void DMStressProxy::insertMultiThread()
void DMStressProxy::updateMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_update_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
update();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_update_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
update();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("updateMultiThread fail");
}
};

update_threads.reserve(opts.update_concurrency);
Expand All @@ -232,16 +267,23 @@ void DMStressProxy::updateMultiThread()
void DMStressProxy::deleteMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_delete_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
deleteRange();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_delete_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
deleteRange();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("deleteMultiThread fail");
}
};

delete_threads.reserve(opts.delete_concurrency);
Expand Down Expand Up @@ -338,11 +380,18 @@ void DMStressProxy::joinThreads(std::vector<std::thread> & threads)
void DMStressProxy::verifySingleThread()
{
auto work = [&]() {
setThreadName("verify");
while (!stop)
try
{
setThreadName("verify");
while (!stop)
{
verify();
sleep(opts.verify_sleep_sec);
}
}
catch (...)
{
verify();
sleep(opts.verify_sleep_sec);
DB::tryLogCurrentException("verifySingleThread fail");
}
};
verify_thread = std::thread(work);
Expand Down Expand Up @@ -411,7 +460,7 @@ void DMStressProxy::verify()

void DMStressProxy::run()
{
genMultiThread(); // Run the gennerate data threads with other read-write thread concurrently
genMultiThread(); // Run the generate data threads with other read-write thread concurrently
readMultiThread();
insertMultiThread();
updateMultiThread();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/tests/stress/DMStressProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class DMStressProxy
void write(const std::vector<Int64> & ids);
UInt64 countRows(UInt32 rnd_break_prob);
void genBlock(Block & block, const std::vector<Int64> & ids);
void joinThreads(std::vector<std::thread> & threads);
static void joinThreads(std::vector<std::thread> & threads);
void insert();
void update();
void deleteRange();
Expand Down
49 changes: 30 additions & 19 deletions dbms/src/Storages/DeltaMerge/tests/stress/main.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/Exception.h>
#include <Storages/DeltaMerge/tests/stress/DMStressProxy.h>
#include <fmt/format.h>
#include <signal.h>
Expand Down Expand Up @@ -99,16 +100,12 @@ void runProxy(const StressOptions & opts, Poco::Logger * log)
DB::DM::tests::DMStressProxy store_proxy(opts);
store_proxy.run();
auto end = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
LOG_INFO(log, "run_count: " << run_count << " start: " << start << " end: " << end << " use time: " << end - start);
LOG_FMT_INFO(log, "run_count: {} start: {} end: {} use time: {}", run_count, start, end, (end - start));
}
}
catch (std::exception & e)
{
LOG_INFO(log, e.what());
}
catch (...)
{
LOG_INFO(log, "Unknow exception");
DB::tryLogCurrentException("runProxy fail");
}
DB::tests::TiFlashTestEnv::shutdown();
}
Expand All @@ -121,36 +118,50 @@ int main(int argc, char * argv[])
{
DB::FailPointHelper::enableFailPoint(fp);
}
auto log = &Poco::Logger::get("DMStressProxy");
auto * log = &Poco::Logger::get("DMStressProxy");
UInt64 run_count = 0;
static std::uniform_int_distribution<unsigned> dist(opts.min_restart_sec, opts.max_restart_sec);
std::default_random_engine generator;
generator.seed(::time(nullptr));
for (;;)
{
run_count++;
LOG_INFO(log, "main loop run count: " << run_count);
LOG_FMT_INFO(log, "main loop run count: {}", run_count);
auto fpid = fork();
if (fpid < 0)
{
LOG_INFO(log, "fork error in run count " << run_count);
LOG_FMT_INFO(log, "fork error in run count {}", run_count);
}
else if (fpid == 0)
{
runProxy(opts, log);
exit(0);
try
{
runProxy(opts, log);
exit(0);
}
catch (...)
{
DB::tryLogCurrentException("runProxy fail");
}
}
else
{
sleep(dist(generator));
// sleep(300);
kill(fpid, SIGKILL);
int status;
wait(&status);
if (WIFEXITED(status))
try
{
sleep(dist(generator));
// sleep(300);
kill(fpid, SIGKILL);
int status;
wait(&status);
if (WIFEXITED(status))
{
LOG_FMT_INFO(log, "child pid {} exit normally", fpid);
break;
}
}
catch (...)
{
LOG_INFO(log, "child pid " << fpid << "exit normally");
break;
DB::tryLogCurrentException("runProxy fail");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Page
{
auto iter = field_offsets.find(FieldOffset(index));
if (unlikely(iter == field_offsets.end()))
throw Exception("Try to getFieldData of Page" + DB::toString(page_id) + " with invalid field index: " + DB::toString(index),
throw Exception(fmt::format("Try to getFieldData with invalid field index [page_id={}] [field_index={}]", page_id, index),
ErrorCodes::LOGICAL_ERROR);

PageFieldOffset beg = iter->offset;
Expand Down Expand Up @@ -112,9 +112,9 @@ struct PageEntry
std::pair<size_t, size_t> getFieldOffsets(size_t index) const
{
if (unlikely(index >= field_offsets.size()))
throw Exception("Try to getFieldData with invalid index: " + DB::toString(index)
+ ", fields size: " + DB::toString(field_offsets.size()),
ErrorCodes::LOGICAL_ERROR);
throw Exception(
fmt::format("Try to getFieldOffsets with invalid index [index={}] [fields_size={}]", index, field_offsets.size()),
ErrorCodes::LOGICAL_ERROR);
else if (index == field_offsets.size() - 1)
return {field_offsets.back().first, size};
else
Expand Down
Loading

0 comments on commit 7c1a78e

Please sign in to comment.