Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve memtracker, add missed check & remove unnecessary thenError&tryCatch check #5199

Merged
merged 13 commits into from
Jan 6, 2023
1 change: 1 addition & 0 deletions cmake/nebula/SanitizerConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ if(ENABLE_ASAN)
add_compile_options(-fsanitize=address)
add_compile_options(-g)
add_compile_options(-fno-omit-frame-pointer)
add_definitions(-DENABLE_ASAN)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
endif()

Expand Down
50 changes: 29 additions & 21 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,21 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
return folly::collectAll(respFutures)
.deferValue([this, requests = std::move(requests), totalLatencies, hosts](
std::vector<folly::Try<StatusOr<Response>>>&& resps) {
// throw in MemoryCheckGuard verified
memory::MemoryCheckGuard guard;
StorageRpcResponse<Response> rpcResp(resps.size());
for (size_t i = 0; i < resps.size(); i++) {
auto& host = hosts->at(i);
auto& tryResp = resps[i];
std::optional<std::string> errMsg;
folly::Try<StatusOr<Response>>& tryResp = resps[i];
if (tryResp.hasException()) {
errMsg = std::string(tryResp.exception().what().c_str());
std::string errMsg = tryResp.exception().what().toStdString();
rpcResp.markFailure();
LOG(ERROR) << "There some RPC errors: " << errMsg;
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
} else {
auto status = std::move(tryResp).value();
StatusOr<Response> status = std::move(tryResp).value();
if (status.ok()) {
auto resp = std::move(status).value();
auto result = resp.get_result();
Expand All @@ -128,17 +133,18 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
// Keep the response
rpcResp.addResponse(std::move(resp));
} else {
errMsg = std::move(status).status().message();
rpcResp.markFailure();
Status s = std::move(status).status();
nebula::cpp2::ErrorCode errorCode =
s.code() == Status::Code::kGraphMemoryExceeded
? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED
: nebula::cpp2::ErrorCode::E_RPC_FAILURE;
LOG(ERROR) << "There some RPC errors: " << s.message();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, errorCode);
}
}

if (errMsg) {
rpcResp.markFailure();
LOG(ERROR) << "There some RPC errors: " << errMsg.value();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
}
}

return rpcResp;
Expand All @@ -160,12 +166,16 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
auto spaceId = request.get_space_id();
return folly::via(evb)
.thenValue([remoteFunc = std::move(remoteFunc), request, evb, host, this](auto&&) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
// NOTE: Create new channel on each thread to avoid TIMEOUT RPC error
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
// Encoding invoke Cpp2Ops::write the request to protocol is in current thread,
// do not need to turn on in Cpp2Ops::write
return remoteFunc(client.get(), request);
})
.thenValue([spaceId, this](Response&& resp) mutable -> StatusOr<Response> {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
Expand Down Expand Up @@ -196,14 +206,12 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
}
return std::move(resp);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<StatusOr<Response>>(std::bad_alloc());
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception& e) {
return folly::makeFuture<StatusOr<Response>>(std::runtime_error(e.what()));
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<StatusOr<Response>>(Status::GraphMemoryExceeded(
"(%d)", static_cast<int32_t>(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED)));
})
.thenError([request, host, spaceId, this](
folly::exception_wrapper&& exWrapper) mutable -> StatusOr<Response> {
stats::StatsManager::addValue(kNumRpcSentToStoragedFailed);
Expand Down
2 changes: 2 additions & 0 deletions src/codec/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ set(CODEC_TEST_LIBS
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:stats_obj>
Expand Down
4 changes: 4 additions & 0 deletions src/common/base/Status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ const char *Status::toString(Code code) {
return "StatementEmpty: ";
case kSemanticError:
return "SemanticError: ";
case kGraphMemoryExceeded:
return "GraphMemoryExceeded: ";
case kKeyNotFound:
return "KeyNotFound: ";
case kPartialSuccess:
return "PartialSuccess: ";
case kStorageMemoryExceeded:
return "StorageMemoryExceeded: ";
case kSpaceNotFound:
return "SpaceNotFound: ";
case kHostNotFound:
Expand Down
5 changes: 5 additions & 0 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,15 @@ class Status final {
// Graph engine errors
STATUS_GENERATOR(SyntaxError);
STATUS_GENERATOR(SemanticError);
STATUS_GENERATOR(GraphMemoryExceeded);

// Nothing is executed When command is comment
STATUS_GENERATOR(StatementEmpty);

// Storage engine errors
STATUS_GENERATOR(KeyNotFound);
STATUS_GENERATOR(PartialSuccess);
STATUS_GENERATOR(StorageMemoryExceeded);

// Meta engine errors
// TODO(dangleptr) we could use ErrorOr to replace SpaceNotFound here.
Expand Down Expand Up @@ -166,9 +169,11 @@ class Status final {
kSyntaxError = 201,
kStatementEmpty = 202,
kSemanticError = 203,
kGraphMemoryExceeded = 204,
// 3xx, for storage engine errors
kKeyNotFound = 301,
kPartialSuccess = 302,
kStorageMemoryExceeded = 303,
// 4xx, for meta service errors
kSpaceNotFound = 404,
kHostNotFound = 405,
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ nebula_add_executable(
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:ast_match_path_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down
13 changes: 13 additions & 0 deletions src/common/datatypes/DataSetOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "common/base/Base.h"
#include "common/datatypes/CommonCpp2Ops.h"
#include "common/datatypes/DataSet.h"
#include "common/memory/MemoryTracker.h"

namespace apache {
namespace thrift {
Expand Down Expand Up @@ -47,7 +48,10 @@ inline constexpr protocol::TType Cpp2Ops<nebula::DataSet>::thriftType() {

template <class Protocol>
uint32_t Cpp2Ops<nebula::DataSet>::write(Protocol* proto, nebula::DataSet const* obj) {
// we do not turn on memory tracker here, when the DataSet object is creating & inserting, it is
// in Processor::process(), where memory tracker is turned on. so we think that is enough.
uint32_t xfer = 0;

xfer += proto->writeStructBegin("DataSet");

xfer += proto->writeFieldBegin("column_names", protocol::T_LIST, 1);
Expand All @@ -62,11 +66,20 @@ uint32_t Cpp2Ops<nebula::DataSet>::write(Protocol* proto, nebula::DataSet const*

xfer += proto->writeFieldStop();
xfer += proto->writeStructEnd();

return xfer;
}

template <class Protocol>
void Cpp2Ops<nebula::DataSet>::read(Protocol* proto, nebula::DataSet* obj) {
// memory usage during decode a StorageResponse should be mostly occupied
// by DataSet (see interface/storage.thrift), turn on memory check here.
//
// MemoryTrackerVerified:
// throw std::bad_alloc has verified, can be captured in
// StorageClientBase::getResponse's onError
nebula::memory::MemoryCheckGuard guard;

apache::thrift::detail::ProtocolReaderStructReadState<Protocol> readState;

readState.readStructBegin(proto);
Expand Down
1 change: 1 addition & 0 deletions src/common/datatypes/ValueOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ template <class Protocol>
uint32_t Cpp2Ops<nebula::Value>::write(Protocol* proto, nebula::Value const* obj) {
uint32_t xfer = 0;
xfer += proto->writeStructBegin("Value");
// MemoryTrackerVerified: throw bad_alloc verified

switch (obj->type()) {
case nebula::Value::Type::NULLVALUE: {
Expand Down
2 changes: 2 additions & 0 deletions src/common/datatypes/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ nebula_add_test(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down Expand Up @@ -115,6 +116,7 @@ nebula_add_test(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
LIBRARIES
Expand Down
2 changes: 2 additions & 0 deletions src/common/expression/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:expr_ctx_mock_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand All @@ -153,6 +154,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down
3 changes: 3 additions & 0 deletions src/common/geo/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ nebula_add_test(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
Expand Down
3 changes: 3 additions & 0 deletions src/common/graph/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ nebula_add_test(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
Expand Down
2 changes: 2 additions & 0 deletions src/common/id/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
Expand Down Expand Up @@ -67,6 +68,7 @@ nebula_add_test(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
Expand Down
4 changes: 4 additions & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void MemoryTracker::free(int64_t size) {
MemoryStats::instance().free(size);
}

bool MemoryTracker::isOn() {
return MemoryStats::instance().throwOnMemoryExceeded();
}

void MemoryTracker::allocImpl(int64_t size, bool) {
MemoryStats::instance().alloc(size);
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class MemoryStats {
threadMemoryStats_.throwOnMemoryExceeded = false;
}

// return true if current thread's throwOnMemoryExceeded'
static bool throwOnMemoryExceeded() {
return threadMemoryStats_.throwOnMemoryExceeded;
}

private:
inline ALWAYS_INLINE void allocGlobal(int64_t size) {
int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed);
Expand Down Expand Up @@ -182,6 +187,9 @@ struct MemoryTracker {
/// This function should be called after memory deallocation.
static void free(int64_t size);

/// Test state of memory tracker, return true if memory tracker is turned on, otherwise false.
static bool isOn();

private:
static void allocImpl(int64_t size, bool throw_if_memory_exceeded);
};
Expand Down
16 changes: 5 additions & 11 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
}

// MemoryStats depends on jemalloc
#if ENABLE_JEMALLOC
#ifdef ENABLE_JEMALLOC
#ifndef ENABLE_ASAN
// set MemoryStats limit (MemoryTracker track-able memory)
int64_t trackable = total - FLAGS_memory_tracker_untracked_reserved_memory_mb * MiB;
if (trackable > 0) {
Expand All @@ -134,16 +135,9 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
if (FLAGS_memory_purge_enabled) {
int64_t now = time::WallClock::fastNowInSec();
if (now - kLastPurge_ > FLAGS_memory_purge_interval_seconds) {
// mallctl seems has issue with address_sanitizer, do purge only when address_sanitizer is off
#if defined(__clang__)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
#endif
#else // gcc
// jemalloc seems has issue with address_sanitizer, do purge only when address_sanitizer is
// off
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
kLastPurge_ = now;
}
}
Expand Down Expand Up @@ -173,7 +167,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
kLastPrintMemoryTrackerStats_ = now;
}
}

#endif
#endif

auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio;
Expand Down
13 changes: 3 additions & 10 deletions src/common/memory/NewDelete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,16 @@
/// Two condition need check before MemoryTracker is on
/// 1. jemalloc is used
/// MemoryTracker need jemalloc API to get accurate size of alloc/free memory.
#if ENABLE_JEMALLOC
/// 2. address_sanitizer is off
/// sanitizer has already override the new/delete operator,
/// only override new/delete operator only when address_sanitizer is off
#if defined(__clang__)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
#ifdef ENABLE_JEMALLOC
#ifndef ENABLE_ASAN
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#else // gcc
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#if defined(ENABLE_MEMORY_TRACKER)
#ifdef ENABLE_MEMORY_TRACKER
/// new
void *operator new(std::size_t size) {
nebula::memory::trackMemory(size);
Expand Down
Loading