Skip to content

Commit

Permalink
[yugabyte#16380] Use thread safety analysis annotations to enforce th…
Browse files Browse the repository at this point in the history
…e reactor thread requirement

Summary:
Some functions in Connection, Reactor, Messenger, OutboundCall, and other classes of the RPC framework have to run only on a particular reactor thread. Also, some fields of those classes are not protected by a mutex but are only accessed in the reactor thread. We can use a "thread role" and Clang thread safety analysis annotations to further document and enforce these requirements during compilation time. Functions can now be annotated with REQUIRES(ReactorThreadRole::kReactor), with a convenience macro ON_REACTOR_THREAD. Functions that should not be run on the reactor thread can be annotated with EXCLUDES(ReactorThreadRole::kReactor), or EXCLUDES_REACTOR_THREAD.

Also removing some redundant checks that the current thread is the correct reactor thread. Functions that are executed as callbacks on the reactor thread should use the following construct. This will check for the current thread (can be turned off by setting the reactor_check_current_thread flag to false), and return a "guard" object:

  auto guard = reactor->CheckCurrentThread();

The guard object, acting as a lock being held, will ensure that the compiler allows calling functions that are marked with ON_REACTOR_THREAD from that point on, and those functions do not need to check that they are running on the reactor thread.

Concurrency issues fixed:
- Access to Reactor::cur_time_ was not being synchronized. It is now an atomic.
- Connection::last_activity_time_ had unclear locking semantics. Made it an atomic.
- Connection::shutdown_status_ was being assigned while holding outbound_data_queue_lock_. However, it was being read without that mutex. Now it is fully guarded by that mutex (which was renamed to outbound_data_queue_mtx_).
- Now we are only modifying Reactor::state_ while holding pending_tasks_mtx_, to match the documented behavior, even though state_ is an atomic.
- Access to Reactor::waiting_conns_ was synchronized in an unclear way. We were erasing connections from that set in a ListenIdle callback that could be called either from the reactor thread or from some other thread after the reactor has started closing, but only in a single-threaded fashion. Instead of this, introduced a special mutex to protect that data structure and got rid of the IsCurrentThreadOrStartedClosing logic.

Some other details of the changes:
- Reorganized fields in Reactor, DelayedTask, and Connection. Clearly split fields into categories:
  - Immutable fields initialized in the constructor
  - Fields protected by a particular mutex
  - Fields only accessed on the reactor thread
  - Atomic fields
- Renamed Reactor::Shutdown to StartShutdown, because it only initiates shutdown and does not wait for it to complete.
- Reactor::StartShutdown had a compare-and-set loop, which did not make much sense, as it usually did exactly two iterations. Replaced that with a single compare-exchange-strong operation.
- Removed Reactor::stop_start_time_. It was assigned but never used.
- Got rid of the Reactor::DrainTaskQueueAndCheckIfClosing method. It was only called from one place, and it looked like a random combination of things clumped together. Simply moved this code where it was being used.
- Moved various reactor task classes to reactor_task.{h,cc}.
- Moved DelayedTask to delayed_task.{h,cc}.
- Made Connection::rpc_metrics_ a reference instead of a const pointer that can never be null anyway. Same for Reactor::messenger_.
- Now consistently handling failures to queue a reactor task by issuing a DFATAL log message.
- Simplified the method Messenger::ScheduleOnReactor. It used to take a Messenger* argument, but it is already a method of Messenger, which was confusing. That argument could be nullptr in Reactor tests. Got rid of that argument and fixed the tests.
- In Reactor::QueueOutboundCall, if we failed to schedule the call due to the reactor being in the wrong state, do not simply log a warning, but call the Transferred method on the OutboundCall with the appropriate status instead.

General improvements of the codebase:
- Removed the UNIQUE_LOCK macro and replaced it with direct use of the UniqueLock wrapper class, utilizing CTAD (class template argument deduction).

Build infrastructure improvements:
- We still have some old macOS build worker nodes with Clang 13 installed. In those cases, use custom-built Clang 14 by specifying compiler type as `clang14`.
- In activate_virtualenv, detect target architecture first so that we don't create a virtualenv for the wrong target architecture (Rosetta 2 emulated x86_64 on an arm64 Apple machine).
- Made Java build output less verbose (ignoring messages about resolving dependencies, Maven plugins, etc.)
- Updated the is_clang function to return true for `clang<version>` values of YB_COMPILER_TYPE.
- We run a "process supervisor" script to look for stray processes after test termination. Temporarily ignore process_supervisor failures if they happen on a CentOS 7 machine with an absent psutil Python module, to work around an issue in a recently updated CentOS 7 test runner VM.
- Set PYTHONPATH at the top level of the run-test.sh test runner script.

Test Plan: Jenkins: run all tests

Reviewers: timur, sergei, dmitry

Reviewed By: sergei, dmitry

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D23468
  • Loading branch information
mbautin committed Apr 3, 2023
1 parent a586836 commit 4879cfb
Show file tree
Hide file tree
Showing 61 changed files with 1,658 additions and 1,148 deletions.
10 changes: 7 additions & 3 deletions build-support/common-build-env-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ test_compiler_detection_by_jenkins_job_name() {
unset YB_COMPILER_TYPE
JOB_NAME="$jenkins_job_name"
set_compiler_type_based_on_jenkins_job_name
assert_equals "$expected_compiler_type" "$YB_COMPILER_TYPE" "compiler type"
if [[ ${YB_COMPILER_TYPE_WAS_ADJUSTED:-false} == "false" ]]; then
assert_equals "$expected_compiler_type" "$YB_COMPILER_TYPE" "compiler type"
fi
)
}

Expand Down Expand Up @@ -141,8 +143,10 @@ test_set_cmake_build_type_and_compiler_type() {
set_cmake_build_type_and_compiler_type
assert_equals "$expected_cmake_build_type" "$cmake_build_type" "$test_case_details" \
"Note: comparing CMake build type."
assert_equals "$expected_compiler_type" "$YB_COMPILER_TYPE" "$test_case_details" \
"Note: comparing compiler type."
if [[ ${YB_COMPILER_TYPE_WAS_ADJUSTED:-false} == "false" ]]; then
assert_equals "$expected_compiler_type" "$YB_COMPILER_TYPE" "$test_case_details" \
"Note: comparing compiler type."
fi
)
local exit_code=$?
set -e
Expand Down
24 changes: 23 additions & 1 deletion build-support/common-build-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ readonly YB_DEFAULT_MVN_SETTINGS_PATH=$HOME/.m2/settings.xml
MVN_OUTPUT_FILTER_REGEX='^\[INFO\] (Download(ing|ed)( from [-a-z0-9.]+)?): '
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] [^ ]+ already added, skipping$'
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] Copying .*[.]jar to .*[.]jar$'
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] Resolved: .*$'
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] Resolved plugin: .*$'
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] Resolved dependency: .*$'
MVN_OUTPUT_FILTER_REGEX+='|^\[INFO\] Installing .* to .*$'
MVN_OUTPUT_FILTER_REGEX+='|^Generating .*[.]html[.][.][.]$'
readonly MVN_OUTPUT_FILTER_REGEX

Expand Down Expand Up @@ -484,6 +488,7 @@ set_default_compiler_type() {
if [[ -z ${YB_COMPILER_TYPE:-} ]]; then
if is_mac; then
YB_COMPILER_TYPE=clang
adjust_compiler_type_on_mac
elif [[ $OSTYPE =~ ^linux ]]; then
detect_architecture
YB_COMPILER_TYPE=clang15
Expand All @@ -496,7 +501,7 @@ set_default_compiler_type() {
}

is_clang() {
if [[ $YB_COMPILER_TYPE == "clang" ]]; then
if [[ $YB_COMPILER_TYPE == clang* ]]; then
return 0
else
return 1
Expand Down Expand Up @@ -539,6 +544,7 @@ set_compiler_type_based_on_jenkins_job_name() {
return
fi
fi
adjust_compiler_type_on_mac
validate_compiler_type
readonly YB_COMPILER_TYPE
export YB_COMPILER_TYPE
Expand Down Expand Up @@ -2205,6 +2211,7 @@ run_shellcheck() {
}

activate_virtualenv() {
detect_architecture
local virtualenv_parent_dir=$YB_BUILD_PARENT_DIR
local virtualenv_dir=$virtualenv_parent_dir/$YB_VIRTUALENV_BASENAME

Expand Down Expand Up @@ -2651,6 +2658,21 @@ build_clangd_index() {
)
}

adjust_compiler_type_on_mac() {
# A workaround for old macOS build workers where the default Clang version is 13 or older.
if is_mac &&
! is_apple_silicon &&
[[ ${YB_COMPILER_TYPE:-clang} == "clang" ]] &&
[[ $(clang --version) =~ clang\ version\ ([0-9]+) ]]
then
clang_major_version=${BASH_REMATCH[1]}
if [[ ${clang_major_version} -lt 14 ]]; then
export YB_COMPILER_TYPE=clang14
# Used in common-build-env-test.sh to avoid failing when the compiler type is adjusted.
export YB_COMPILER_TYPE_WAS_ADJUSTED=true
fi
fi
}

# -------------------------------------------------------------------------------------------------
# Initialization
Expand Down
27 changes: 19 additions & 8 deletions build-support/common-test-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1703,13 +1703,23 @@ run_java_test() {
process_tree_supervisor_append_log_to_on_error=$test_log_path
stop_process_tree_supervisor

if ! "$process_supervisor_success"; then
log "Process tree supervisor script reported an error, marking the test as failed in" \
"$junit_xml_path"
"$YB_SRC_ROOT"/build-support/update_test_result_xml.py \
--result-xml "$junit_xml_path" \
--mark-as-failed true \
--extra-message "Process supervisor script reported errors (e.g. unterminated processes)."
if [[ ${process_supervisor_success} == "false" ]]; then
if grep -Eq 'CentOS Linux release 7[.]' /etc/centos-release &&
! python3 -c 'import psutil' &>/dev/null
then
log "Process tree supervisor script reported an error, but this is CentOS 7 and " \
"the psutil module is not available in the VM image that we are using. Ignoring " \
"the process supervisor for now. We will revert this temporary workaround after the " \
"CentOS 7 image is updated. JUnit XML path: $junit_xml_path"
process_supervisor_success=true
else
log "Process tree supervisor script reported an error, marking the test as failed in" \
"$junit_xml_path"
"$YB_SRC_ROOT"/build-support/update_test_result_xml.py \
--result-xml "$junit_xml_path" \
--mark-as-failed true \
--extra-message "Process supervisor script reported errors (e.g. unterminated processes)."
fi
fi

if is_jenkins ||
Expand Down Expand Up @@ -1752,7 +1762,8 @@ run_java_test() {
fi

declare -i java_test_exit_code=$mvn_exit_code
if [[ $java_test_exit_code -eq 0 ]] && ! "$process_supervisor_success"; then
if [[ $java_test_exit_code -eq 0 &&
${process_supervisor_success} == "false" ]]; then
java_test_exit_code=1
fi

Expand Down
1 change: 1 addition & 0 deletions build-support/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ yb_readonly_virtualenv=true

detect_architecture
activate_virtualenv
set_pythonpath

if [[ -n ${YB_LIST_CTEST_TESTS_ONLY:-} ]]; then
# This has to match CTEST_TEST_PROGRAM_RE in run_tests_on_spark.py.
Expand Down
3 changes: 0 additions & 3 deletions python/yb/dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import unittest
import pipes
import time
import shutil
import random
import string

from datetime import datetime
from enum import Enum
Expand Down
4 changes: 2 additions & 2 deletions python/yb/postprocess_test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import sys
import os
import logging
import yugabyte_pycommon # type: ignore
import argparse
import xml.etree.ElementTree as ET
import json
Expand All @@ -30,7 +29,8 @@

from typing import Any, Dict, AnyStr

from yb.common_util import init_logging
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from yb.common_util import init_logging # noqa


# Example test failure (from C++)
Expand Down
5 changes: 2 additions & 3 deletions python/yb/rewrite_test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import random
import atexit

from sys_detection import is_macos

from typing import List, Optional, Set, Any, Tuple, Dict

from yb.common_util import init_logging, shlex_join
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from yb.common_util import init_logging, shlex_join # noqa

UUID_RE_STR = '[0-9a-f]{32}'
TABLET_OR_PEER_ID_RE_STR = r'\b[T|P] [0-9a-f]{32}\b'
Expand Down
36 changes: 18 additions & 18 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VTRACE_TO(2, trace_, "Preparing $0 ops", AsString(ops_info->groups));

{
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
if (!status_.ok()) {
auto status = status_;
lock.unlock();
Expand Down Expand Up @@ -526,7 +526,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
auto transaction = transaction_->shared_from_this();
TRACE_TO(trace_, __func__);
{
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
auto status = CheckCouldCommitUnlocked(seal_only);
if (!status.ok()) {
lock.unlock();
Expand Down Expand Up @@ -574,7 +574,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VLOG_WITH_PREFIX(2) << "Abort";
TRACE_TO(trace_, __func__);
{
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
auto state = state_.load(std::memory_order_acquire);
if (state != TransactionState::kRunning) {
if (state != TransactionState::kAborted) {
Expand Down Expand Up @@ -673,7 +673,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

Status PromoteToGlobal(const CoarseTimePoint& deadline) EXCLUDES(mutex_) {
{
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
RETURN_NOT_OK(StartPromotionToGlobal());
}
DoPromoteToGlobal(deadline);
Expand Down Expand Up @@ -708,7 +708,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

std::shared_future<Result<TransactionMetadata>> GetMetadata(
CoarseTimePoint deadline) EXCLUDES(mutex_) {
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
if (metadata_future_.valid()) {
return metadata_future_;
}
Expand All @@ -718,7 +718,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
auto transaction = transaction_->shared_from_this();
waiters_.push_back([this, transaction](const Status& status) {
WARN_NOT_OK(status, "Transaction request failed");
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
if (status.ok()) {
metadata_promise_.set_value(metadata_);
} else {
Expand All @@ -740,7 +740,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
PrepareChildCallback callback) {
auto transaction = transaction_->shared_from_this();
TRACE_TO(trace_, __func__);
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
auto status = CheckRunningUnlocked();
if (!status.ok()) {
lock.unlock();
Expand Down Expand Up @@ -770,7 +770,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {

Result<ChildTransactionResultPB> FinishChild() {
TRACE_TO(trace_, __func__);
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
RETURN_NOT_OK(CheckRunningUnlocked());
if (!child_) {
return STATUS(IllegalState, "Finish child of non child transaction");
Expand Down Expand Up @@ -802,7 +802,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
CleanupType::kImmediate, cleanup_tablet_ids);
});
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
if (state_.load(std::memory_order_acquire) == TransactionState::kAborted) {
cleanup_tablet_ids.reserve(result.tablets().size());
for (const auto& tablet : result.tablets()) {
Expand Down Expand Up @@ -845,7 +845,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
}

Result<TransactionMetadata> Release() {
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
auto state = state_.load(std::memory_order_acquire);
if (state != TransactionState::kRunning) {
return STATUS_FORMAT(IllegalState, "Attempt to release transaction in the wrong state $0: $1",
Expand Down Expand Up @@ -1138,7 +1138,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
<< Format("Commit, seal_only: $0, tablets: $1, status: $2",
seal_only, tablets_, status);

UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);

if (!status.ok()) {
VLOG_WITH_PREFIX(4) << "Commit failed: " << status;
Expand Down Expand Up @@ -1587,13 +1587,13 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
// See NotifyWaitersAndRelease.
void NotifyWaiters(const Status& status, const char* operation,
SetReady set_ready = SetReady::kFalse) EXCLUDES(mutex_) {
UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
NotifyWaitersAndRelease(&lock, status, operation, set_ready);
}

// Notify all waiters. The transaction will be aborted if it is running and status is not OK.
// `lock` must be UNIQUE_LOCK(.., mutex_), and will be released in this function.
// If `set_ready` is true and status is OK, `ready_` must be false, and will be set to true.
// `lock` will be released in this function. If `set_ready` is true and status is OK, `ready_`
// must be false, and will be set to true.
void NotifyWaitersAndRelease(UniqueLock<std::shared_mutex>* lock,
Status status,
const char* operation,
Expand Down Expand Up @@ -1785,7 +1785,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
FATAL_INVALID_ENUM_VALUE(TransactionStatus, transaction_status);
} else {
auto state = state_.load(std::memory_order_acquire);
LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", state: " << state;
LOG_WITH_PREFIX(WARNING) << "Send heartbeat failed: " << status << ", txn state: " << state;

if (status.IsAborted() || status.IsExpired()) {
// IsAborted - Service is shutting down, no reason to retry.
Expand Down Expand Up @@ -1832,7 +1832,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(2) << "DoSendUpdateTransactionStatusLocationRpcs()";

UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);

if (transaction_status_move_handles_.empty()) {
auto old_status_tablet = old_status_tablet_;
Expand Down Expand Up @@ -1924,7 +1924,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
return;
}

UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);

auto handle = GetTransactionStatusMoveHandle(tablet_id);
auto rpc = PrepareUpdateTransactionStatusLocationRpc(
Expand All @@ -1951,7 +1951,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VLOG_WITH_PREFIX(1) << "Transaction status update for participant tablet "
<< tablet_id << ": " << yb::ToString(status);

UNIQUE_LOCK(lock, mutex_);
UniqueLock lock(mutex_);
auto handle = GetTransactionStatusMoveHandle(tablet_id);
manager_->rpcs().Unregister(handle);

Expand Down
15 changes: 8 additions & 7 deletions src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,18 @@ bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling";
return false;
}
auto task_id = master_->messenger()->ScheduleOnReactor(
auto task_id_result = master_->messenger()->ScheduleOnReactor(
std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1),
MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger());
VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
reactor_task_id_.store(task_id, std::memory_order_release);

if (task_id == rpc::kInvalidTaskId) {
AbortTask(STATUS(Aborted, "Messenger closing"));
MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION());
if (!task_id_result.ok()) {
AbortTask(task_id_result.status());
UnregisterAsyncTask();
return false;
}
auto task_id = *task_id_result;

VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
reactor_task_id_.store(task_id, std::memory_order_release);

return TransitionToWaitingState(MonitoredTaskState::kScheduling);
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/catalog_loaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct TemporaryLoadingState {
private: \
Status Visit( \
const key_type& key, \
const entry_pb_name& metadata) override REQUIRES(mutex); \
const entry_pb_name& metadata) REQUIRES(mutex) override; \
\
CatalogManager *catalog_manager_; \
\
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/xcluster/xcluster_safe_time_service-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService {
return table_entries_;
}

Status RefreshProducerTabletToNamespaceMap() override REQUIRES(mutex_) {
Status RefreshProducerTabletToNamespaceMap() REQUIRES(mutex_) override {
if (producer_tablet_namespace_map_ != consumer_registry_) {
producer_tablet_namespace_map_ = consumer_registry_;
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/rocksdb/USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ DNANexus is using RocksDB to speed up processing of genomics data.
You can learn more from this great blog post by Mike Lin: http://devblog.dnanexus.com/faster-bam-sorting-with-samtools-and-rocksdb/

## Iron.io
Iron.io is using RocksDB as a storage engine for their distributed queueing system.
Iron.io is using RocksDB as a storage engine for their distributed queuing system.
Learn more from Tech Talk by Reed Allman: http://www.youtube.com/watch?v=HTjt6oj-RL4

## Tango Me
Expand Down
2 changes: 2 additions & 0 deletions src/yb/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ set(YRPC_SRCS
poller.cc
proxy.cc
reactor.cc
reactor_task.cc
delayed_task.cc
refined_stream.cc
remote_method.cc
rpc.cc
Expand Down
Loading

0 comments on commit 4879cfb

Please sign in to comment.