Skip to content

Commit

Permalink
[#19143] Docdb/ActiveHistory: Instrument inbound calls for reads and …
Browse files Browse the repository at this point in the history
…write rpcs.

Summary:
Each inbound_call wait maintain WaitStateInfo which gets
updated at various points during the execution, when we
expect that the rpc may end up waiting/stalling for a long
time.

Specifically, in this diff, we annotate waits upon waiting
for a lock/condition or when doing I/O.

Sysbench read/write test runs show no performance impact.
With and without enabling ash
#19143 (comment)

**Upgrade/Rollback safety:**

Should be ok to rollback. The functionality to see the local calls will be unavailable
if rolled back. Should not have any other effect on other features.

Test Plan: ybd --cxx-test 'TEST_F(PgMiniAsh, Ash) {'

Reviewers: arybochkin, esheng

Reviewed By: arybochkin, esheng

Subscribers: yql, rthallam, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D30484
  • Loading branch information
amitanandaiyer committed Dec 20, 2023
1 parent dc3b1a6 commit ce31f43
Show file tree
Hide file tree
Showing 53 changed files with 480 additions and 47 deletions.
31 changes: 18 additions & 13 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@

#include <arpa/inet.h>

#include "yb/util/debug-util.h"
#include "yb/util/tostring.h"
#include "yb/util/trace.h"

DEFINE_test_flag(bool, yb_enable_ash, false, "True to enable Active Session History");
DEFINE_test_flag(bool, export_wait_state_names, yb::IsDebug(), "Exports wait-state name as a "
"human understandable string.");


namespace yb::ash {

namespace {
// The current wait_state_ for this thread.
thread_local WaitStateInfoPtr threadlocal_wait_state_;
}
} // namespace

void AshMetadata::set_client_host_port(const HostPort &host_port) {
client_host_port = host_port;
Expand Down Expand Up @@ -115,15 +121,15 @@ void WaitStateInfo::SetCurrentWaitState(WaitStateInfoPtr wait_state) {
threadlocal_wait_state_ = std::move(wait_state);
}

WaitStateInfoPtr WaitStateInfo::CurrentWaitState() {
const WaitStateInfoPtr& WaitStateInfo::CurrentWaitState() {
if (!threadlocal_wait_state_) {
VLOG_WITH_FUNC(3) << " returning nullptr";
}
return threadlocal_wait_state_;
}

//
// ScopedWaitState
// ScopedAdoptWaitState
//
ScopedAdoptWaitState::ScopedAdoptWaitState(WaitStateInfoPtr wait_state)
: prev_state_(WaitStateInfo::CurrentWaitState()) {
Expand All @@ -137,24 +143,23 @@ ScopedAdoptWaitState::~ScopedAdoptWaitState() {
//
// ScopedWaitStatus
//
ScopedWaitStatus::ScopedWaitStatus(WaitStateInfoPtr wait_state, WaitStateCode code)
: wait_state_(std::move(wait_state)), code_(code) {
if (wait_state_) {
prev_code_ = wait_state_->code();
wait_state_->set_code(code_);
}
ScopedWaitStatus::ScopedWaitStatus(WaitStateCode code)
: code_(code),
prev_code_(
WaitStateInfo::CurrentWaitState()
? WaitStateInfo::CurrentWaitState()->mutable_code().exchange(code_)
: code_) {
}

ScopedWaitStatus::~ScopedWaitStatus() {
if (wait_state_) {
const auto &wait_state = WaitStateInfo::CurrentWaitState();
if (wait_state) {
auto expected = code_;
if (!wait_state_->mutable_code().compare_exchange_strong(expected, prev_code_)) {
if (!wait_state->mutable_code().compare_exchange_strong(expected, prev_code_)) {
VLOG(3) << __func__ << " not reverting to prev_code_: " << prev_code_ << " since "
<< " current_code: " << expected << " is not " << code_;
}
}
wait_state_ = nullptr;
prev_code_ = WaitStateCode::Unused;
}

} // namespace yb::ash
83 changes: 61 additions & 22 deletions src/yb/ash/wait_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
#include "yb/util/net/net_util.h"
#include "yb/util/uuid.h"

#define SET_WAIT_STATUS_TO(ptr, state) \
if ((ptr)) (ptr)->set_state(state)
#define SET_WAIT_STATUS(state) \
SET_WAIT_STATUS_TO(yb::ash::WaitStateInfo::CurrentWaitState(), (state))
DECLARE_bool(TEST_export_wait_state_names);

#define SET_WAIT_STATUS_TO(ptr, code) \
if ((ptr)) (ptr)->set_code(BOOST_PP_CAT(yb::ash::WaitStateCode::k, code))
#define SET_WAIT_STATUS(code) \
SET_WAIT_STATUS_TO(yb::ash::WaitStateInfo::CurrentWaitState(), code)

#define ADOPT_WAIT_STATE(ptr) \
yb::ash::ScopedAdoptWaitState _scoped_state { (ptr) }

#define SCOPED_WAIT_STATUS_FOR(ptr, state) \
yb::ash::ScopedWaitStatus _scoped_status { (ptr), (state) }
#define SCOPED_WAIT_STATUS(state) \
SCOPED_WAIT_STATUS_FOR(yb::ash::WaitStateInfo::CurrentWaitState(), (state))
#define SCOPED_WAIT_STATUS(code) \
yb::ash::ScopedWaitStatus _scoped_status(BOOST_PP_CAT(yb::ash::WaitStateCode::k, code))

// Wait components refer to which process the specific wait-event is part of.
// Generally, these are PG, TServer, YBClient/Perform layer, and PgGate.
Expand Down Expand Up @@ -75,6 +75,7 @@
#define YB_ASH_CLASS_CONSENSUS YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_TSERVER, 0xDU)
#define YB_ASH_CLASS_TABLET_WAIT YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_TSERVER, 0xCU)
#define YB_ASH_CLASS_ROCKSDB YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_TSERVER, 0xBU)
#define YB_ASH_CLASS_COMMON YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_TSERVER, 0xAU)

#define YB_ASH_CLASS_PG_CLIENT_SERVICE YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_YBC, 0xFU)
#define YB_ASH_CLASS_CQL_WAIT_STATE YB_ASH_MAKE_CLASS(YB_ASH_COMPONENT_YBC, 0xEU)
Expand All @@ -85,14 +86,47 @@
namespace yb::ash {

YB_DEFINE_TYPED_ENUM(WaitStateCode, uint32_t,
((Unused, 0)));
((kUnused, 0))
((kOnCpu_Active, YB_ASH_CLASS_COMMON))
(kOnCpu_Passive)
(kRpc_Done)
(kRpcs_WaitOnMutexInShutdown)
(kRetryableRequests_SaveToDisk)
((kMVCC_WaitForSafeTime, YB_ASH_CLASS_TABLET_WAIT))
(kLockedBatchEntry_Lock)
(kBackfillIndex_WaitForAFreeSlot)
(kCreatingNewTablet)
(kSaveRaftGroupMetadataToDisk)
(kTransactionStatusCache_DoGetCommitData)
(kWaitForYsqlBackendsCatalogVersion)
(kWriteAutoFlagsConfigToDisk)
(kWriteInstanceMetadataToDisk)
(kWriteSysCatalogSnapshotToDisk)
(kDumpRunningRpc_WaitOnReactor)
(kConflictResolution_ResolveConficts)
(kConflictResolution_WaitOnConflictingTxns)
((kWAL_Open, YB_ASH_CLASS_CONSENSUS)) // waiting for WALEdits to be persisted.
(kWAL_Close)
(kWAL_Write)
(kWAL_AllocateNewSegment)
(kWAL_Sync)
(kWAL_Wait)
(kWaitOnWAL)
(kRaft_WaitingForQuorum)
(kRaft_ApplyingEdits)
(kConsensusMeta_Flush)
(kReplicaState_TakeUpdateLock)
(kReplicaState_WaitForMajorityReplicatedHtLeaseExpiration)
((kRocksDB_OnCpu_Active, YB_ASH_CLASS_ROCKSDB))
(kRocksDB_ReadBlockFromFile)
(kRocksDB_ReadIO));

struct AshMetadata {
Uuid root_request_id = Uuid::Nil();
Uuid yql_endpoint_tserver_uuid = Uuid::Nil();
int64_t query_id = 0;
int64_t rpc_request_id = 0;
HostPort client_host_port;
HostPort client_host_port{};

void set_client_host_port(const HostPort& host_port);

Expand Down Expand Up @@ -174,9 +208,9 @@ struct AshMetadata {
};

struct AshAuxInfo {
TableId table_id;
TabletId tablet_id;
std::string method;
TableId table_id = "";
TabletId tablet_id = "";
std::string method = "";

std::string ToString() const;

Expand Down Expand Up @@ -214,36 +248,42 @@ class WaitStateInfo {
void set_rpc_request_id(int64_t id) EXCLUDES(mutex_);
void set_client_host_port(const HostPort& host_port) EXCLUDES(mutex_);

static WaitStateInfoPtr CurrentWaitState();
static const WaitStateInfoPtr& CurrentWaitState();
static void SetCurrentWaitState(WaitStateInfoPtr);

void UpdateMetadata(const AshMetadata& meta) EXCLUDES(mutex_);
void UpdateAuxInfo(const AshAuxInfo& aux) EXCLUDES(mutex_);

template <class PB>
static void UpdateMetadataFromPB(const PB& pb) {
auto wait_state = CurrentWaitState();
const auto& wait_state = CurrentWaitState();
if (wait_state) {
wait_state->UpdateMetadata(AshMetadata::FromPB(pb));
}
}

template <class PB>
void MetadataToPB(PB* pb) EXCLUDES(mutex_) {
std::lock_guard lock(mutex_);
metadata_.ToPB(pb);
}

template <class PB>
void ToPB(PB* pb) EXCLUDES(mutex_) {
std::lock_guard lock(mutex_);
metadata_.ToPB(pb->mutable_metadata());
WaitStateCode code = this->code();
pb->set_wait_status_code(yb::to_underlying(code));
#ifndef NDEBUG
pb->set_wait_status_code_as_string(yb::ToString(code));
#endif
if (FLAGS_TEST_export_wait_state_names) {
pb->set_wait_status_code_as_string(yb::ToString(code));
}
aux_info_.ToPB(pb->mutable_aux_info());
}

std::string ToString() const EXCLUDES(mutex_);

private:
std::atomic<WaitStateCode> code_{WaitStateCode::Unused};
std::atomic<WaitStateCode> code_{WaitStateCode::kUnused};

mutable simple_spinlock mutex_;
AshMetadata metadata_ GUARDED_BY(mutex_);
Expand Down Expand Up @@ -279,13 +319,12 @@ class ScopedAdoptWaitState {
// be reverted back to the previous state.
class ScopedWaitStatus {
public:
ScopedWaitStatus(WaitStateInfoPtr wait_state, WaitStateCode code);
explicit ScopedWaitStatus(WaitStateCode code);
~ScopedWaitStatus();

private:
WaitStateInfoPtr wait_state_;
const WaitStateCode code_;
WaitStateCode prev_code_;
const WaitStateCode prev_code_;

DISALLOW_COPY_AND_ASSIGN(ScopedWaitStatus);
};
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ set(CLIENT_LIBS
test_echo_service_proto
tserver_util
yb_ql_expr
yb_ash
yb_util
gutil
yrpc
Expand Down
5 changes: 5 additions & 0 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "yb/client/async_rpc.h"

#include "yb/ash/wait_state.h"

#include "yb/client/batcher.h"
#include "yb/client/client_error.h"
#include "yb/client/in_flight_op.h"
Expand Down Expand Up @@ -374,6 +376,9 @@ AsyncRpcBase<Req, Resp>::AsyncRpcBase(
: AsyncRpc(data, consistency_level) {
req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
req_.set_include_trace(IsTracingEnabled());
if (const auto& wait_state = ash::WaitStateInfo::CurrentWaitState()) {
wait_state->MetadataToPB(req_.mutable_ash_metadata());
}
const ConsistentReadPoint* read_point = batcher_->read_point();
bool has_read_time = false;
if (read_point) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ ADD_YB_LIBRARY(consensus_proto
ADD_YB_LIBRARY(
consensus_error
SRCS consensus_error.cc
DEPS consensus_proto yb_util)
DEPS consensus_proto yb_ash yb_util)

#########################################
# log_proto
Expand Down
3 changes: 3 additions & 0 deletions src/yb/consensus/consensus_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include "yb/consensus/consensus_meta.h"

#include "yb/ash/wait_state.h"

#include "yb/common/entity_ids_types.h"
#include "yb/common/wire_protocol.h"

Expand Down Expand Up @@ -280,6 +282,7 @@ Status ConsensusMetadata::Flush() {
"Invalid config in ConsensusMetadata, cannot flush to disk");

string meta_file_path = VERIFY_RESULT(fs_manager_->GetConsensusMetadataPath(tablet_id_));
SCOPED_WAIT_STATUS(ConsensusMeta_Flush);
RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath(
fs_manager_->encrypted_env(), meta_file_path, pb_,
pb_util::OVERWRITE,
Expand Down
9 changes: 9 additions & 0 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

#include <boost/algorithm/string/predicate.hpp>

#include "yb/ash/wait_state.h"

#include "yb/common/schema_pbutil.h"
#include "yb/common/schema.h"

Expand Down Expand Up @@ -569,6 +571,7 @@ Status Log::Open(const LogOptions &options,
const PreLogRolloverCallback& pre_log_rollover_callback,
NewSegmentAllocationCallback callback,
CreateNewSegment create_new_segment) {
SCOPED_WAIT_STATUS(WAL_Open);

RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, DirName(wal_dir)),
Substitute("Failed to create table wal dir $0", DirName(wal_dir)));
Expand Down Expand Up @@ -731,6 +734,7 @@ Status Log::CloseCurrentSegment() {
footer_builder_.set_close_timestamp_micros(close_timestamp_micros);
Status status;
{
SCOPED_WAIT_STATUS(WAL_Close);
std::lock_guard lock(active_segment_mutex_);
status = active_segment_->WriteIndexWithFooterAndClose(log_index_.get(),
&footer_builder_);
Expand Down Expand Up @@ -895,6 +899,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, SkipWalWrite skip_wal_write) {
int64_t start_offset = active_segment_->written_offset();

LOG_SLOW_EXECUTION(WARNING, 50, "Append to log took a long time") {
SCOPED_WAIT_STATUS(WAL_Write);
SCOPED_LATENCY_METRIC(metrics_, append_latency);
LongOperationTracker long_operation_tracker(
"Log append", FLAGS_consensus_log_scoped_watch_delay_append_threshold_ms * 1ms);
Expand Down Expand Up @@ -1099,6 +1104,7 @@ Status Log::EnsureSegmentInitializedUnlocked() {
// might not be necessary. We only call ::DoSync directly before we call ::CloseCurrentSegment
Status Log::DoSync() {
// Acquire the lock over active_segment_ to prevent segment rollover in the interim.
SCOPED_WAIT_STATUS(WAL_Sync);
std::lock_guard lock(active_segment_mutex_);
if (active_segment_->IsClosed()) {
return Status::OK();
Expand Down Expand Up @@ -1801,6 +1807,7 @@ WritableFileOptions Log::GetNewSegmentWritableFileOptions() {
}

Status Log::PreAllocateNewSegment() {
SCOPED_WAIT_STATUS(WAL_AllocateNewSegment);
TRACE_EVENT1("log", "PreAllocateNewSegment", "file", next_segment_path_);
CHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationInProgress);

Expand Down Expand Up @@ -1864,6 +1871,7 @@ Status Log::SwitchToAllocatedSegment() {
RETURN_NOT_OK(new_segment->WriteHeader(header));
// Calling Sync() here is important because it ensures the file has a complete WAL header
// on disk before renaming the file.
SCOPED_WAIT_STATUS(WAL_Sync);
RETURN_NOT_OK(new_segment->Sync());

const auto new_segment_path =
Expand Down Expand Up @@ -1907,6 +1915,7 @@ Status Log::SwitchToAllocatedSegment() {
}

Status Log::ReplaceSegmentInReaderUnlocked() {
SCOPED_WAIT_STATUS(WAL_Open);
// We should never switch to a new segment if we wrote nothing to the old one.
CHECK(active_segment_->IsClosed());
shared_ptr<RandomAccessFile> readable_file;
Expand Down
Loading

0 comments on commit ce31f43

Please sign in to comment.