Skip to content

Commit

Permalink
[feat](cloud) Add injection point http api for ms
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei committed Oct 21, 2024
1 parent f65223f commit 85c351b
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 13 deletions.
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ if [[ "${BUILD_CLOUD}" -eq 1 ]]; then
-DCMAKE_MAKE_PROGRAM="${MAKE_PROGRAM}" \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
-DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \
-DMAKE_TEST=OFF \
"${CMAKE_USE_CCACHE}" \
-DUSE_LIBCPP="${USE_LIBCPP}" \
Expand Down
4 changes: 4 additions & 0 deletions cloud/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ if (${MAKE_TEST} STREQUAL "ON")
add_definitions(-DBE_TEST)
endif ()

if (ENABLE_INJECTION_POINT)
add_definitions(-DENABLE_INJECTION_POINT)
endif()

# Add libs if needed, download to current dir -- ${BUILD_DIR}
set(FDB_LIB "fdb_lib_7_1_23.tar.xz")
file(GLOB RELEASE_FILE_LIST LIST_DIRECTORIES false "/etc/*release*")
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_library(MetaService
meta_server.cpp
meta_service.cpp
meta_service_http.cpp
injection_point_http.cpp
meta_service_job.cpp
meta_service_resource.cpp
meta_service_schema.cpp
Expand Down
226 changes: 226 additions & 0 deletions cloud/src/meta-service/injection_point_http.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>

#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "meta_service.h"
#include "meta_service_http.h"

namespace doris::cloud {

std::map<std::string, std::function<void()>> suite_map;
std::once_flag register_suites_once;

inline std::default_random_engine make_random_engine() {
return std::default_random_engine(
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
}

static void register_suites() {
suite_map.emplace("test_txn_lazy_commit", [] {
auto sp = SyncPoint::get_instance();

sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", [&](auto&& args) {
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(100, 1000);
uint32_t duration_ms = u(rng);
LOG(INFO) << "commit_txn_immediately::advance_last_pending_txn_id sleep " << duration_ms
<< " ms";
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
});

sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) {
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(100, 1000);
uint32_t duration_ms = u(rng);
LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_submit sleep " << duration_ms
<< " ms";
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
});

sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) {
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(100, 1000);
uint32_t duration_ms = u(rng);
LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_wait sleep " << duration_ms
<< " ms";
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
});

sp->set_call_back("convert_tmp_rowsets::before_commit", [&](auto&& args) {
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(1, 50);
uint32_t duration_ms = u(rng);
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
LOG(INFO) << "convert_tmp_rowsets::before_commit sleep " << duration_ms << " ms";
if (duration_ms <= 25) {
MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[0]);
*code = MetaServiceCode::KV_TXN_CONFLICT;
bool* pred = try_any_cast<bool*>(args.back());
*pred = true;
LOG(INFO) << "convert_tmp_rowsets::before_commit random_value=" << duration_ms
<< " inject kv txn conflict";
}
});
});
}

HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
std::string duration_str(http_query(uri, "duration"));
int64_t duration = 0;
try {
duration = std::stol(duration_str);
} catch (const std::exception& e) {
auto msg = fmt::format("invalid duration:{}", duration_str);
LOG(WARNING) << msg;
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point, duration](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " sleep ms=" << duration;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
});
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse set_return(const std::string& point, const brpc::URI& uri) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return void";
auto pred = try_any_cast<bool*>(args.back());
*pred = true;
} catch (const std::bad_any_cast& e) {
LOG(ERROR) << "failed to process `return` e:" << e.what();
}
});

return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_set(const brpc::URI& uri) {
const std::string point(http_query(uri, "name"));
if (point.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point name");
}

const std::string behavior(http_query(uri, "behavior"));
if (behavior.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty behavior");
}
if (behavior == "sleep") {
return set_sleep(point, uri);
} else if (behavior == "return") {
return set_return(point, uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior);
}

HttpResponse handle_clear(const brpc::URI& uri) {
const std::string point(http_query(uri, "name"));
auto* sp = SyncPoint::get_instance();
LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point);
if (point.empty()) {
// If point name is emtpy, clear all
sp->clear_all_call_backs();
return http_json_reply(MetaServiceCode::OK, "OK");
}
sp->clear_call_back(point);
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_apply_suite(const brpc::URI& uri) {
const std::string suite(http_query(uri, "name"));
if (suite.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty suite name");
}

std::call_once(register_suites_once, register_suites);
if (auto it = suite_map.find(suite); it != suite_map.end()) {
it->second(); // set injection callbacks
return http_json_reply(MetaServiceCode::OK, "OK apply suite " + suite + "\n");
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown suite: " + suite + "\n");
}

HttpResponse handle_enable(const brpc::URI& uri) {
SyncPoint::get_instance()->enable_processing();
return http_json_reply(MetaServiceCode::OK, "OK");
}

HttpResponse handle_disable(const brpc::URI& uri) {
SyncPoint::get_instance()->disable_processing();
return http_json_reply(MetaServiceCode::OK, "OK");
}

//
// enable/disable injection point
// ```
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=enable"
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=disable"
// ```

// clear all injection points
// ```
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear"
// ```

// apply/activate specific suite with registered action, see `register_suites()` for more details
// ```
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=apply_suite&name=${suite_name}"
// ```

// ```
// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs

// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
// &name=${injection_point_name}&behavior=return" # return void
// ```

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
LOG(INFO) << "handle InjectionPointAction uri:" << uri;
const std::string op(http_query(uri, "op"));

if (op == "set") {
return handle_set(uri);
} else if (op == "clear") {
return handle_clear(uri);
} else if (op == "apply_suite") {
return handle_apply_suite(uri);
} else if (op == "enable") {
return handle_enable(uri);
} else if (op == "disable") {
return handle_disable(uri);
}

return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op);
}
} // namespace doris::cloud
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "advance_last_txn failed last_txn="
<< version_pb.pending_txn_ids(0) << " code=" << code
<< "msg=" << msg;
<< " msg=" << msg;
return;
}
continue;
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ static std::string_view remove_version_prefix(std::string_view path) {
return path;
}

HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl);

static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) {
static std::unordered_map<std::string_view, AlterClusterRequest::Operation> operations {
{"add_cluster", AlterClusterRequest::ADD_CLUSTER},
Expand Down Expand Up @@ -575,11 +577,13 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"get_value", process_get_value},
{"show_meta_ranges", process_show_meta_ranges},
{"txn_lazy_commit", process_txn_lazy_commit},
{"injection_point", process_injection_point},
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/txn_lazy_commit", process_txn_lazy_commit},
{"v1/injection_point", process_injection_point},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1893,7 +1893,7 @@ void commit_txn_eventually(
std::pair<MetaServiceCode, std::string> ret = task->wait();
if (ret.first != MetaServiceCode::OK) {
LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first
<< "msg=" << ret.second;
<< " msg=" << ret.second;
}

std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
Expand Down
5 changes: 4 additions & 1 deletion cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
Expand Down Expand Up @@ -189,6 +190,7 @@ void convert_tmp_rowsets(
if (code != MetaServiceCode::OK) return;
}

TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down Expand Up @@ -453,7 +455,8 @@ std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::wait() {
sw.pause();
if (sw.elapsed_us() > 1000000) {
LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" << sw.elapsed_us() / 1000
<< " ms";
<< " ms"
<< " txn_id=" << txn_id_;
}
return std::make_pair(this->code_, this->msg_);
}
Expand Down
2 changes: 1 addition & 1 deletion common/cpp/sync_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {

// TEST_SYNC_POINT is no op in release build.
// Turn on this feature by defining the macro
#ifndef BE_TEST
#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT)
# define TEST_SYNC_POINT(x)
# define TEST_IDX_SYNC_POINT(x, index)
# define TEST_SYNC_POINT_CALLBACK(x, ...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,15 @@ public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long>
LOG.debug("get version from meta service, partitions: {}, versions: {}", partitionIds, versions);
}

if (isEmptyPartitionPruneDisabled()) {
ArrayList<Long> news = new ArrayList<>();
for (Long v : versions) {
news.add(v == -1 ? 1 : v);
}
return news;
}

if (versionUpdateTimesMs != null) {
versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList());
}

return versions;
ArrayList<Long> news = new ArrayList<>();
for (Long v : versions) {
news.add(v == -1 ? Partition.PARTITION_INIT_VERSION : v);
}
return news;
}

@Override
Expand Down

0 comments on commit 85c351b

Please sign in to comment.