Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fix BlobStore read with FieldReadInfos #4181

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/Exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
{
try
{
LOG_ERROR(logger, start_of_message << (start_of_message.empty() ? "" : ": ") << getCurrentExceptionMessage(true));
LOG_FMT_ERROR(logger, "{}{}{}", start_of_message, (start_of_message.empty() ? "" : ": "), getCurrentExceptionMessage(true));
}
catch (...)
{
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/tests/stress/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
include_directories (${CMAKE_CURRENT_BINARY_DIR})

add_executable(dm_stress main.cpp DMStressProxy.cpp ${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp)
add_executable(dm_stress
main.cpp
DMStressProxy.cpp
${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp
${TiFlash_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestEnv.cpp
)
target_link_libraries(dm_stress dbms gtest clickhouse_functions clickhouse-server-lib)
133 changes: 91 additions & 42 deletions dbms/src/Storages/DeltaMerge/tests/stress/DMStressProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,42 @@ void DMStressProxy::genMultiThread()

void DMStressProxy::genData(UInt32 id, UInt64 rows)
{
std::string thread_name = "dm_gen_" + std::to_string(id);
setThreadName(thread_name.c_str());
UInt64 generated_count = 0;
while (generated_count < rows)
try
{
auto c = std::min(opts.gen_rows_per_block, rows - generated_count);
auto ids = pk.get(c);
write(ids);
generated_count += c;
std::string thread_name = "dm_gen_" + std::to_string(id);
setThreadName(thread_name.c_str());
UInt64 generated_count = 0;
while (generated_count < rows)
{
auto c = std::min(opts.gen_rows_per_block, rows - generated_count);
auto ids = pk.get(c);
write(ids);
generated_count += c;
}
}
catch (...)
{
DB::tryLogCurrentException("genData fail");
}
}

void DMStressProxy::write(const std::vector<Int64> & ids)
{
Block block;
genBlock(block, ids);
try
{
Block block;
genBlock(block, ids);

auto locks = key_lock.getLocks(ids);
store->write(*context, context->getSettingsRef(), std::move(block));
if (opts.verify)
auto locks = key_lock.getLocks(ids);
store->write(*context, context->getSettingsRef(), block);
if (opts.verify)
{
pks.insert(ids);
}
}
catch (...)
{
pks.insert(ids);
DB::tryLogCurrentException("runProxy fail");
}
}

Expand All @@ -132,19 +146,26 @@ void DMStressProxy::genBlock(Block & block, const std::vector<Int64> & ids)
insertColumn<UInt64>(block, TAG_COLUMN_TYPE, TAG_COLUMN_NAME, TAG_COLUMN_ID, v_tag);
std::vector<UInt64> v_balance(ids.size(), 1024);
insertColumn<UInt64>(block, col_balance_define.type, col_balance_define.name, col_balance_define.id, v_balance);
std::string s("C", 128);
std::string s(128, 'C');
std::vector<String> v_s(ids.size(), s);
insertColumn<String>(block, col_random_define.type, col_random_define.name, col_random_define.id, v_s);
}

void DMStressProxy::readMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_read_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
std::string thread_name = "dm_read_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
countRows(rnd() % 100);
}
}
catch (...)
{
countRows(rnd() % 100);
DB::tryLogCurrentException("readMultiThread fail");
}
};

Expand Down Expand Up @@ -188,16 +209,23 @@ UInt64 DMStressProxy::countRows(UInt32 rnd_break_prob)
void DMStressProxy::insertMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_insert_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
insert();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_insert_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
insert();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("insertMultiThread fail");
}
};

insert_threads.reserve(opts.insert_concurrency);
Expand All @@ -210,16 +238,23 @@ void DMStressProxy::insertMultiThread()
void DMStressProxy::updateMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_update_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
update();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_update_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
update();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("updateMultiThread fail");
}
};

update_threads.reserve(opts.update_concurrency);
Expand All @@ -232,16 +267,23 @@ void DMStressProxy::updateMultiThread()
void DMStressProxy::deleteMultiThread()
{
auto work = [&](UInt32 id) {
std::string thread_name = "dm_delete_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
try
{
deleteRange();
if (opts.write_sleep_us > 0)
std::string thread_name = "dm_delete_" + std::to_string(id);
setThreadName(thread_name.c_str());
while (!stop)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
deleteRange();
if (opts.write_sleep_us > 0)
{
std::this_thread::sleep_for(std::chrono::microseconds(opts.write_sleep_us));
}
}
}
catch (...)
{
DB::tryLogCurrentException("deleteMultiThread fail");
}
};

delete_threads.reserve(opts.delete_concurrency);
Expand Down Expand Up @@ -338,11 +380,18 @@ void DMStressProxy::joinThreads(std::vector<std::thread> & threads)
void DMStressProxy::verifySingleThread()
{
auto work = [&]() {
setThreadName("verify");
while (!stop)
try
{
setThreadName("verify");
while (!stop)
{
verify();
sleep(opts.verify_sleep_sec);
}
}
catch (...)
{
verify();
sleep(opts.verify_sleep_sec);
DB::tryLogCurrentException("verifySingleThread fail");
}
};
verify_thread = std::thread(work);
Expand Down Expand Up @@ -411,7 +460,7 @@ void DMStressProxy::verify()

void DMStressProxy::run()
{
genMultiThread(); // Run the gennerate data threads with other read-write thread concurrently
genMultiThread(); // Run the generate data threads with other read-write thread concurrently
readMultiThread();
insertMultiThread();
updateMultiThread();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/tests/stress/DMStressProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class DMStressProxy
void write(const std::vector<Int64> & ids);
UInt64 countRows(UInt32 rnd_break_prob);
void genBlock(Block & block, const std::vector<Int64> & ids);
void joinThreads(std::vector<std::thread> & threads);
static void joinThreads(std::vector<std::thread> & threads);
void insert();
void update();
void deleteRange();
Expand Down
49 changes: 30 additions & 19 deletions dbms/src/Storages/DeltaMerge/tests/stress/main.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/Exception.h>
#include <Storages/DeltaMerge/tests/stress/DMStressProxy.h>
#include <fmt/format.h>
#include <signal.h>
Expand Down Expand Up @@ -99,16 +100,12 @@ void runProxy(const StressOptions & opts, Poco::Logger * log)
DB::DM::tests::DMStressProxy store_proxy(opts);
store_proxy.run();
auto end = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
LOG_INFO(log, "run_count: " << run_count << " start: " << start << " end: " << end << " use time: " << end - start);
LOG_FMT_INFO(log, "run_count: {} start: {} end: {} use time: {}", run_count, start, end, (end - start));
}
}
catch (std::exception & e)
{
LOG_INFO(log, e.what());
}
catch (...)
{
LOG_INFO(log, "Unknow exception");
DB::tryLogCurrentException("runProxy fail");
}
DB::tests::TiFlashTestEnv::shutdown();
}
Expand All @@ -121,36 +118,50 @@ int main(int argc, char * argv[])
{
DB::FailPointHelper::enableFailPoint(fp);
}
auto log = &Poco::Logger::get("DMStressProxy");
auto * log = &Poco::Logger::get("DMStressProxy");
UInt64 run_count = 0;
static std::uniform_int_distribution<unsigned> dist(opts.min_restart_sec, opts.max_restart_sec);
std::default_random_engine generator;
generator.seed(::time(nullptr));
for (;;)
{
run_count++;
LOG_INFO(log, "main loop run count: " << run_count);
LOG_FMT_INFO(log, "main loop run count: {}", run_count);
auto fpid = fork();
if (fpid < 0)
{
LOG_INFO(log, "fork error in run count " << run_count);
LOG_FMT_INFO(log, "fork error in run count {}", run_count);
}
else if (fpid == 0)
{
runProxy(opts, log);
exit(0);
try
{
runProxy(opts, log);
exit(0);
}
catch (...)
{
DB::tryLogCurrentException("runProxy fail");
}
}
else
{
sleep(dist(generator));
// sleep(300);
kill(fpid, SIGKILL);
int status;
wait(&status);
if (WIFEXITED(status))
try
{
sleep(dist(generator));
// sleep(300);
kill(fpid, SIGKILL);
int status;
wait(&status);
if (WIFEXITED(status))
{
LOG_FMT_INFO(log, "child pid {} exit normally", fpid);
break;
}
}
catch (...)
{
LOG_INFO(log, "child pid " << fpid << "exit normally");
break;
DB::tryLogCurrentException("runProxy fail");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Page
{
auto iter = field_offsets.find(FieldOffset(index));
if (unlikely(iter == field_offsets.end()))
throw Exception("Try to getFieldData of Page" + DB::toString(page_id) + " with invalid field index: " + DB::toString(index),
throw Exception(fmt::format("Try to getFieldData with invalid field index [page_id={}] [field_index={}]", page_id, index),
ErrorCodes::LOGICAL_ERROR);

PageFieldOffset beg = iter->offset;
Expand Down Expand Up @@ -112,9 +112,9 @@ struct PageEntry
std::pair<size_t, size_t> getFieldOffsets(size_t index) const
{
if (unlikely(index >= field_offsets.size()))
throw Exception("Try to getFieldData with invalid index: " + DB::toString(index)
+ ", fields size: " + DB::toString(field_offsets.size()),
ErrorCodes::LOGICAL_ERROR);
throw Exception(
fmt::format("Try to getFieldOffsets with invalid index [index={}] [fields_size={}]", index, field_offsets.size()),
ErrorCodes::LOGICAL_ERROR);
else if (index == field_offsets.size() - 1)
return {field_offsets.back().first, size};
else
Expand Down
Loading