From 85c351b2c1e19dfe46c0aa62ddc54222753d4ccc Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Fri, 27 Sep 2024 11:02:49 +0800 Subject: [PATCH] [feat](cloud) Add injection point http api for ms --- build.sh | 1 + cloud/CMakeLists.txt | 4 + cloud/src/meta-service/CMakeLists.txt | 1 + .../src/meta-service/injection_point_http.cpp | 226 ++++++++++++++++++ cloud/src/meta-service/meta_service.cpp | 2 +- cloud/src/meta-service/meta_service_http.cpp | 4 + cloud/src/meta-service/meta_service_txn.cpp | 2 +- cloud/src/meta-service/txn_lazy_committer.cpp | 5 +- common/cpp/sync_point.h | 2 +- .../doris/cloud/catalog/CloudPartition.java | 14 +- 10 files changed, 248 insertions(+), 13 deletions(-) create mode 100644 cloud/src/meta-service/injection_point_http.cpp diff --git a/build.sh b/build.sh index 1da5df76bb2fdd..032988298238be 100755 --- a/build.sh +++ b/build.sh @@ -631,6 +631,7 @@ if [[ "${BUILD_CLOUD}" -eq 1 ]]; then -DCMAKE_MAKE_PROGRAM="${MAKE_PROGRAM}" \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \ + -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \ -DMAKE_TEST=OFF \ "${CMAKE_USE_CCACHE}" \ -DUSE_LIBCPP="${USE_LIBCPP}" \ diff --git a/cloud/CMakeLists.txt b/cloud/CMakeLists.txt index 32e60f7bfb5467..f8084acf77bf1c 100644 --- a/cloud/CMakeLists.txt +++ b/cloud/CMakeLists.txt @@ -411,6 +411,10 @@ if (${MAKE_TEST} STREQUAL "ON") add_definitions(-DBE_TEST) endif () +if (ENABLE_INJECTION_POINT) + add_definitions(-DENABLE_INJECTION_POINT) +endif() + # Add libs if needed, download to current dir -- ${BUILD_DIR} set(FDB_LIB "fdb_lib_7_1_23.tar.xz") file(GLOB RELEASE_FILE_LIST LIST_DIRECTORIES false "/etc/*release*") diff --git a/cloud/src/meta-service/CMakeLists.txt b/cloud/src/meta-service/CMakeLists.txt index c7c4887a0686c1..d11f87e7fa23d4 100644 --- a/cloud/src/meta-service/CMakeLists.txt +++ b/cloud/src/meta-service/CMakeLists.txt @@ -12,6 +12,7 @@ add_library(MetaService meta_server.cpp meta_service.cpp meta_service_http.cpp + injection_point_http.cpp meta_service_job.cpp meta_service_resource.cpp meta_service_schema.cpp diff --git a/cloud/src/meta-service/injection_point_http.cpp b/cloud/src/meta-service/injection_point_http.cpp new file mode 100644 index 00000000000000..80d1bcfdf2e4d8 --- /dev/null +++ b/cloud/src/meta-service/injection_point_http.cpp @@ -0,0 +1,226 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/config.h" +#include "common/logging.h" +#include "cpp/sync_point.h" +#include "meta-service/keys.h" +#include "meta-service/meta_service_helper.h" +#include "meta-service/txn_kv.h" +#include "meta-service/txn_kv_error.h" +#include "meta_service.h" +#include "meta_service_http.h" + +namespace doris::cloud { + +std::map> suite_map; +std::once_flag register_suites_once; + +inline std::default_random_engine make_random_engine() { + return std::default_random_engine( + static_cast(std::chrono::steady_clock::now().time_since_epoch().count())); +} + +static void register_suites() { + suite_map.emplace("test_txn_lazy_commit", [] { + auto sp = SyncPoint::get_instance(); + + sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_immediately::advance_last_pending_txn_id sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_submit sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_wait sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("convert_tmp_rowsets::before_commit", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution u(1, 50); + uint32_t duration_ms = u(rng); + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + LOG(INFO) << "convert_tmp_rowsets::before_commit sleep " << duration_ms << " ms"; + if (duration_ms <= 25) { + MetaServiceCode* code = try_any_cast(args[0]); + *code = MetaServiceCode::KV_TXN_CONFLICT; + bool* pred = try_any_cast(args.back()); + *pred = true; + LOG(INFO) << "convert_tmp_rowsets::before_commit random_value=" << duration_ms + << " inject kv txn conflict"; + } + }); + }); +} + +HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) { + std::string duration_str(http_query(uri, "duration")); + int64_t duration = 0; + try { + duration = std::stol(duration_str); + } catch (const std::exception& e) { + auto msg = fmt::format("invalid duration:{}", duration_str); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + auto sp = SyncPoint::get_instance(); + sp->set_call_back(point, [point, duration](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point << " sleep ms=" << duration; + std::this_thread::sleep_for(std::chrono::milliseconds(duration)); + }); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse set_return(const std::string& point, const brpc::URI& uri) { + auto sp = SyncPoint::get_instance(); + sp->set_call_back(point, [point](auto&& args) { + try { + LOG(INFO) << "injection point hit, point=" << point << " return void"; + auto pred = try_any_cast(args.back()); + *pred = true; + } catch (const std::bad_any_cast& e) { + LOG(ERROR) << "failed to process `return` e:" << e.what(); + } + }); + + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_set(const brpc::URI& uri) { + const std::string point(http_query(uri, "name")); + if (point.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point name"); + } + + const std::string behavior(http_query(uri, "behavior")); + if (behavior.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty behavior"); + } + if (behavior == "sleep") { + return set_sleep(point, uri); + } else if (behavior == "return") { + return set_return(point, uri); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior); +} + +HttpResponse handle_clear(const brpc::URI& uri) { + const std::string point(http_query(uri, "name")); + auto* sp = SyncPoint::get_instance(); + LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point); + if (point.empty()) { + // If point name is emtpy, clear all + sp->clear_all_call_backs(); + return http_json_reply(MetaServiceCode::OK, "OK"); + } + sp->clear_call_back(point); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_apply_suite(const brpc::URI& uri) { + const std::string suite(http_query(uri, "name")); + if (suite.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty suite name"); + } + + std::call_once(register_suites_once, register_suites); + if (auto it = suite_map.find(suite); it != suite_map.end()) { + it->second(); // set injection callbacks + return http_json_reply(MetaServiceCode::OK, "OK apply suite " + suite + "\n"); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown suite: " + suite + "\n"); +} + +HttpResponse handle_enable(const brpc::URI& uri) { + SyncPoint::get_instance()->enable_processing(); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_disable(const brpc::URI& uri) { + SyncPoint::get_instance()->disable_processing(); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +// +// enable/disable injection point +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=enable" +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=disable" +// ``` + +// clear all injection points +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear" +// ``` + +// apply/activate specific suite with registered action, see `register_suites()` for more details +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=apply_suite&name=${suite_name}" +// ``` + +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set +// &name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs + +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set +// &name=${injection_point_name}&behavior=return" # return void +// ``` + +HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + LOG(INFO) << "handle InjectionPointAction uri:" << uri; + const std::string op(http_query(uri, "op")); + + if (op == "set") { + return handle_set(uri); + } else if (op == "clear") { + return handle_clear(uri); + } else if (op == "apply_suite") { + return handle_apply_suite(uri); + } else if (op == "enable") { + return handle_enable(uri); + } else if (op == "disable") { + return handle_disable(uri); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op); +} +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 7adbc8ccf12aab..018c30316fef8c 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1498,7 +1498,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, if (code != MetaServiceCode::OK) { LOG(WARNING) << "advance_last_txn failed last_txn=" << version_pb.pending_txn_ids(0) << " code=" << code - << "msg=" << msg; + << " msg=" << msg; return; } continue; diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 9a9f6de97cc4dd..4395bb98190eb5 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -170,6 +170,8 @@ static std::string_view remove_version_prefix(std::string_view path) { return path; } +HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl); + static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { static std::unordered_map operations { {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, @@ -575,11 +577,13 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"get_value", process_get_value}, {"show_meta_ranges", process_show_meta_ranges}, {"txn_lazy_commit", process_txn_lazy_commit}, + {"injection_point", process_injection_point}, {"v1/decode_key", process_decode_key}, {"v1/encode_key", process_encode_key}, {"v1/get_value", process_get_value}, {"v1/show_meta_ranges", process_show_meta_ranges}, {"v1/txn_lazy_commit", process_txn_lazy_commit}, + {"v1/injection_point", process_injection_point}, // for get {"get_instance", process_get_instance_info}, {"get_obj_store_info", process_get_obj_store_info}, diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index b372c360289926..ee95ad0bd232fe 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1893,7 +1893,7 @@ void commit_txn_eventually( std::pair ret = task->wait(); if (ret.first != MetaServiceCode::OK) { LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first - << "msg=" << ret.second; + << " msg=" << ret.second; } std::unordered_map tablet_stats; // tablet_id -> stats diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index 94baa217024b58..218d2c0686dc76 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -21,6 +21,7 @@ #include "common/logging.h" #include "common/util.h" +#include "cpp/sync_point.h" #include "meta-service/keys.h" #include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_tablet_stats.h" @@ -189,6 +190,7 @@ void convert_tmp_rowsets( if (code != MetaServiceCode::OK) return; } + TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -453,7 +455,8 @@ std::pair TxnLazyCommitTask::wait() { sw.pause(); if (sw.elapsed_us() > 1000000) { LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" << sw.elapsed_us() / 1000 - << " ms"; + << " ms" + << " txn_id=" << txn_id_; } return std::make_pair(this->code_, this->msg_); } diff --git a/common/cpp/sync_point.h b/common/cpp/sync_point.h index f26e64fe7c3575..0378918f62753e 100644 --- a/common/cpp/sync_point.h +++ b/common/cpp/sync_point.h @@ -205,7 +205,7 @@ auto try_any_cast_ret(std::vector& any) { // TEST_SYNC_POINT is no op in release build. // Turn on this feature by defining the macro -#ifndef BE_TEST +#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT) # define TEST_SYNC_POINT(x) # define TEST_IDX_SYNC_POINT(x, index) # define TEST_SYNC_POINT_CALLBACK(x, ...) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index b2a9751394f2d8..a075680e47643b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -253,19 +253,15 @@ public static List getSnapshotVisibleVersion(List dbIds, List LOG.debug("get version from meta service, partitions: {}, versions: {}", partitionIds, versions); } - if (isEmptyPartitionPruneDisabled()) { - ArrayList news = new ArrayList<>(); - for (Long v : versions) { - news.add(v == -1 ? 1 : v); - } - return news; - } - if (versionUpdateTimesMs != null) { versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList()); } - return versions; + ArrayList news = new ArrayList<>(); + for (Long v : versions) { + news.add(v == -1 ? Partition.PARTITION_INIT_VERSION : v); + } + return news; } @Override