Skip to content

Commit

Permalink
[Profiler] Pay for what you use (v2) (pytorch#74484)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: pytorch#74484

In my first attempt at this in December I stamped out specializations using variadic templates. However I'm able to get comparable performance using simple conditionals since the branch is very predictable and AppendOnlyList::emplace_back is low enough overhead that multiple calls don't cause an issue.

This is also a chance to do some BE: rather than force ops and backend events to use the same fields (which in practice means setting a bunch of default values when reporting backend events), I just split them and use a variant.

Test Plan: The single threaded benchmark (with no extra options set) improved considerably from ~0.88 us to ~0.62 us. The stress test benchmark improved modestly from ~6.1 us to ~5.8 us. So the bottleneck for multi-threading is somewhere else, but doing less wasted work is still able to move the needle a little bit.

Reviewed By: swolchok

Differential Revision: D34779994

fbshipit-source-id: 392bc7c6f12797fa5e18777063aa21210d9d2067
(cherry picked from commit f0a49ff)
  • Loading branch information
Taylor Robie authored and pytorchmergebot committed Mar 24, 2022
1 parent 3f164e0 commit 2ecf743
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 160 deletions.
142 changes: 55 additions & 87 deletions torch/csrc/autograd/profiler_kineto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
#include <c10/macros/Export.h>
#include <c10/util/flat_hash_map.h>
#include <c10/util/irange.h>
#include <c10/util/overloaded.h>
#include <c10/util/variant.h>

#include <torch/csrc/jit/frontend/tracer.h>
#include <torch/csrc/jit/runtime/interpreter.h>
#include <torch/csrc/jit/runtime/operator.h>
#include <torch/csrc/profiler/api.h>
#include <torch/csrc/profiler/collection.h>
#include <torch/csrc/profiler/containers.h>
Expand Down Expand Up @@ -138,18 +137,14 @@ static inline uint64_t getForwardThreadKey(uint64_t tid, uint64_t seqNr) {
return (((tid) << 48) | ((seqNr) & (((uint64_t)1 << 48) - 1)));
}

struct KinetoObserverContext : public at::ObserverContext {
explicit KinetoObserverContext(torch::profiler::impl::OpEventData* data) : data_(data) {}
torch::profiler::impl::OpEventData* data_;
};

struct KinetoThreadLocalState : public ProfilerThreadLocalStateBase {
explicit KinetoThreadLocalState(
const ProfilerConfig& config,
std::set<torch::profiler::impl::ActivityType> activities)
: ProfilerThreadLocalStateBase(config),
start_time_(getTimeUs()),
activities_(std::move(activities)),
record_queue_(config),
cpu_trace_(start_time_, "PyTorch Profiler") {}
~KinetoThreadLocalState() override = default;

Expand Down Expand Up @@ -250,8 +245,8 @@ struct KinetoThreadLocalState : public ProfilerThreadLocalStateBase {

for (const auto& e : record_queue_.getRecords(converter)) {
// `take_data` handles time conversion.
int64_t start_us = e.start_time_.us_;
int64_t end_us = e.end_time_.us_;
int64_t start_us = e.start_time_us_;
int64_t end_us = e.end_time_us_;

if (end_us < start_us) {
// We initialize end_us_ to the smallest int64_t, so this means that
Expand All @@ -260,52 +255,65 @@ struct KinetoThreadLocalState : public ProfilerThreadLocalStateBase {
}

cpu_trace_.addCPUActivity(
e.name_,
e.name(),
e.kineto_info_,
e.correlation_id_,
e.correlation_id(),
start_us,
end_us);

kineto_events_.emplace_back();
kineto_events_.back()
.name(e.name_)
.name(e.name())
.startUs(start_us)
.durationUs(end_us - start_us)
.correlationId(e.correlation_id_)
.correlationId(e.correlation_id())
.deviceType(c10::DeviceType::CPU)
.startThreadId(e.start_thread_id_)
.endThreadId(e.end_thread_id_)
.sequenceNr(e.sequence_number_)
.fwdThreadId(e.forward_thread_id_)
.scope(e.record_function_scope_)
.setAsync(e.is_async_)
.debugHandle(e.debug_handle_);

if (!e.shapes_.empty()) {
kineto_events_.back().shapes(e.shapes_);
.startThreadId(e.start_tid_);

c10::visit(
c10::overloaded(
[&](const torch::profiler::impl::OpEvent& op_event) {
kineto_events_.back()
.endThreadId(op_event.end_thread_id_)
.sequenceNr(op_event.sequence_number_)
.fwdThreadId(op_event.forward_thread_id_)
.scope(op_event.record_function_scope_)
.setAsync(op_event.is_async_)
.debugHandle(op_event.debug_handle_);
},
[&](const torch::profiler::impl::BackendEvent& backend_event) {
kineto_events_.back()
.endThreadId(e.start_tid_)
.scope(backend_event.record_function_scope_)
.debugHandle(backend_event.debug_handle_)
.backend(backend_event.backend_);
}),
e.event_);

if (!e.inputs_.shapes_.empty()) {
kineto_events_.back().shapes(e.inputs_.shapes_);
}

if (!e.dtypes_.empty()) {
kineto_events_.back().dtypes(e.dtypes_);
if (!e.inputs_.dtypes_.empty()) {
kineto_events_.back().dtypes(e.inputs_.dtypes_);
}

if (!e.stack_.empty()) {
kineto_events_.back().stack(e.stack_);
if (!e.jit_stack_.empty()) {
kineto_events_.back().stack(e.jit_stack_);
}

if (e.module_hierarchy_) {
kineto_events_.back().moduleHierarchy(*e.module_hierarchy_);
if (!e.jit_modules_.empty()) {
kineto_events_.back().moduleHierarchy(e.jit_modules_);
}

if (!e.extra_args_.empty()) {
kineto_events_.back().flops(
computeFlops(std::string(e.name_), e.extra_args_));
}
if (e.backend_) {
kineto_events_.back().backend(*e.backend_);
computeFlops(e.name(), e.extra_args_));
}
kineto_events_.back().cuda_event_start_ = e.cuda_event_start_;
kineto_events_.back().cuda_event_end_ = e.cuda_event_end_;
kineto_events_.back().cuda_event_start_ =
e.gpu_fallback_.cuda_event_start_;
kineto_events_.back().cuda_event_end_ =
e.gpu_fallback_.cuda_event_end_;
}
}

Expand Down Expand Up @@ -598,48 +606,7 @@ void pushProfilingCallbacks(const std::unordered_set<at::RecordScope>& scopes) {
const auto& config = state_ptr->config();
auto corr_id = next_correlation_id();
torch::profiler::impl::kineto::pushCorrelationId(corr_id);

auto data_ptr =
state_ptr->record_queue_.getSubqueue()->emplace_back(
corr_id,
fn.threadId(),
fn.seqNr(),
fn.forwardThreadId(),
fn.scope(),
fn.isAsync(),
fn.debugHandle(),
fn.name());
if (config.report_input_shapes) {
data_ptr->shapes_ = torch::profiler::impl::inputSizes(fn);
data_ptr->dtypes_ = torch::profiler::impl::inputTypes(fn);
}
#if !defined BUILD_LITE_INTERPRETER && !defined C10_MOBILE
// backward nodes source range corresponds to the forward node
// TODO: consider using C++ stack trace
if (config.with_stack &&
fn.scope() != at::RecordScope::BACKWARD_FUNCTION) {
auto cs = torch::profiler::impl::prepareCallstack(jit::currentCallstack());
data_ptr->stack_ = callstackStr(cs);
}
if (config.with_modules &&
fn.scope() != at::RecordScope::BACKWARD_FUNCTION) {
data_ptr->module_hierarchy_ = jit::currentModuleHierarchy();
}
#endif
if (config.with_flops) {
data_ptr->extra_args_ = torch::profiler::impl::saveExtraArgs(fn);
}
data_ptr->start_time_.count_ = torch::profiler::impl::getApproximateTime();

if (config.state == ProfilerState::KINETO_GPU_FALLBACK) {
try {
torch::profiler::impl::cudaStubs()->record(
nullptr, &data_ptr->cuda_event_start_, nullptr);
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to record CUDA event. " << e.what();
}
}
return std::make_unique<KinetoObserverContext>(data_ptr);
return state_ptr->record_queue_.getSubqueue()->begin_op(fn, corr_id);
},
[](const at::RecordFunction& fn, at::ObserverContext* ctx_ptr) {
auto state_ptr = KinetoThreadLocalState::getTLS();
Expand All @@ -648,16 +615,16 @@ void pushProfilingCallbacks(const std::unordered_set<at::RecordScope>& scopes) {
}
const auto& config = state_ptr->config();
auto* kineto_ctx_ptr =
static_cast<KinetoObserverContext*>(ctx_ptr);
static_cast<torch::profiler::impl::KinetoObserverContext*>(ctx_ptr);
TORCH_INTERNAL_ASSERT(kineto_ctx_ptr != nullptr);
auto data_ptr = kineto_ctx_ptr->data_;
data_ptr->end_time_.count_=torch::profiler::impl::getApproximateTime();
data_ptr->end_thread_id_ = at::RecordFunction::currentThreadId();

kineto_ctx_ptr->event_->end_time_ = torch::profiler::impl::getApproximateTime();
kineto_ctx_ptr->event_->end_thread_id_ = at::RecordFunction::currentThreadId();
if (config.state == ProfilerState::KINETO_GPU_FALLBACK) {
try {
auto fallback = kineto_ctx_ptr->fallback_;
TORCH_INTERNAL_ASSERT(fallback != nullptr);
torch::profiler::impl::cudaStubs()->record(
nullptr, &data_ptr->cuda_event_end_, nullptr);
nullptr, &fallback->cuda_event_end_, nullptr);
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to record CUDA event. " << e.what();
}
Expand Down Expand Up @@ -685,13 +652,14 @@ void reportBackendEventToActiveKinetoProfiler(
return;
}

state_ptr->record_queue_.getSubqueue()->emplace_back(
state_ptr->record_queue_.getSubqueue()->emplace_backend_event(
torch::profiler::impl::BackendEvent {
start_time_us,
end_time_us,
scope,
(uint8_t)scope,
debug_handle,
event_name,
backend_name);
backend_name});

/* no support for input shapes now?
if (config.report_input_shapes) {
Expand Down
133 changes: 121 additions & 12 deletions torch/csrc/profiler/collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <algorithm>

#include <ATen/record_function.h>
#include <c10/util/overloaded.h>
#include <torch/csrc/jit/runtime/interpreter.h>

namespace torch {
namespace profiler {
Expand All @@ -25,7 +27,74 @@ std::atomic<uint32_t> queue_id_{0};
thread_local SubQueueThreadCache sub_queue_cache_{0, nullptr};
} // namespace

RecordQueue::RecordQueue() : id_(++queue_id_) {}
std::string Result::name() const {
return c10::visit([](auto& e){ return e.name_; }, event_);
}

uint64_t Result::correlation_id() const {
return c10::visit(c10::overloaded(
[](const OpEvent& e){ return e.correlation_id_; },
[](const BackendEvent& e) { return std::numeric_limits<uint64_t>::max(); }
), event_);
}

ThreadLocalSubqueue::ThreadLocalSubqueue(
const uint64_t tid,
const ProfilerConfig& config)
: tid_{tid}, config_{config}, kineto_info_{kineto::kineto_ids()} {}

std::unique_ptr<KinetoObserverContext> ThreadLocalSubqueue::begin_op(
const at::RecordFunction& fn,
uint64_t correlation_id) {
auto event = op_events_.emplace_back(
correlation_id,
fn.threadId(),
fn.seqNr(),
fn.forwardThreadId(),
fn.scope(),
fn.isAsync(),
fn.debugHandle(),
fn.name());
if (config_.report_input_shapes) {
inputs_.emplace_back(
torch::profiler::impl::inputSizes(fn),
torch::profiler::impl::inputTypes(fn));
}

#if !defined BUILD_LITE_INTERPRETER && !defined C10_MOBILE
// backward nodes source range corresponds to the forward node
// TODO: consider using C++ stack trace
if (config_.with_stack && fn.scope() != at::RecordScope::BACKWARD_FUNCTION) {
auto cs = torch::profiler::impl::prepareCallstack(jit::currentCallstack());
jit_stack_.emplace_back(callstackStr(cs));
}
if (config_.with_modules &&
fn.scope() != at::RecordScope::BACKWARD_FUNCTION) {
jit_modules_.emplace_back(jit::currentModuleHierarchy());
}
#endif
if (config_.with_flops) {
extra_args_.emplace_back(torch::profiler::impl::saveExtraArgs(fn));
}

auto out = std::make_unique<KinetoObserverContext>(event);

if (config_.state == ProfilerState::KINETO_GPU_FALLBACK) {
try {
out->fallback_ = gpu_fallback_.emplace_back();
torch::profiler::impl::cudaStubs()->record(
nullptr, &out->fallback_->cuda_event_start_, nullptr);
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to record CUDA event. " << e.what();
}
}

event->start_time_ = torch::profiler::impl::getApproximateTime();
return out;
}

RecordQueue::RecordQueue(const ProfilerConfig& config)
: id_(++queue_id_), config_{config} {}

ThreadLocalSubqueue* RecordQueue::getSubqueue() {
// In the most common case, a thread will want to write to the same sub-queue
Expand All @@ -45,34 +114,74 @@ ThreadLocalSubqueue* RecordQueue::getSubqueue() {
auto it = sub_queues_.find(tid);
if (it == sub_queues_.end()) {
it =
sub_queues_.emplace(tid, std::make_unique<ThreadLocalSubqueue>()).first;
sub_queues_.emplace(tid, std::make_unique<ThreadLocalSubqueue>(tid, config_)).first;
}

sub_queue_cache_ = SubQueueThreadCache{id_, it->second.get()};
return it->second.get();
}

std::deque<OpEventData> RecordQueue::getRecords(
template <typename T>
auto steal_or_default(T& it) {
if (it.exhausted()) {
return typename T::value_type();
} else {
auto result = std::move(*it);
++it;
return result;
}
}

std::deque<Result> RecordQueue::getRecords(
std::function<time_t(approx_time_t)> time_converter) {
auto converter = [&](approx_time_t t) {
return t == std::numeric_limits<approx_time_t>::min()
? std::numeric_limits<int64_t>::min()
: time_converter(t) / 1000; // ns to ms
};
std::deque<OpEventData> out;
std::deque<Result> out;
for (auto& subqueue_it : sub_queues_) {
for (auto& i : subqueue_it.second->data_) {
if (!i.backend_.has_value()) {
i.start_time_.us_ = converter(i.start_time_.count_);
i.end_time_.us_ = converter(i.end_time_.count_);
}
out.emplace_back(std::move(i));
auto& queue = *subqueue_it.second;
for (auto& i : queue.backend_events_) {
Result r;
r.start_time_us_ = i.start_time_us_;
r.end_time_us_ = i.end_time_us_;
r.start_tid_ = queue.tid();
r.kineto_info_ = queue.kineto_info();
r.event_ = std::move(i);
out.push_back(std::move(r));
}

auto input_it = queue.inputs_.begin();
auto jit_stack_it = queue.jit_stack_.begin();
auto jit_module_it = queue.jit_modules_.begin();
auto extra_args_it = queue.extra_args_.begin();
auto gpu_fallback_it = queue.gpu_fallback_.begin();
for (auto& i : queue.op_events_) {
Result r;
r.start_time_us_ = converter(i.start_time_);
r.end_time_us_ = converter(i.end_time_);
r.start_tid_ = queue.tid();
r.kineto_info_ = queue.kineto_info();
r.event_ = std::move(i);
r.inputs_ = steal_or_default(input_it);
r.jit_stack_ = steal_or_default(jit_stack_it);
r.jit_modules_ = steal_or_default(jit_module_it);
r.extra_args_ = steal_or_default(extra_args_it);
r.gpu_fallback_ = steal_or_default(gpu_fallback_it);

out.push_back(std::move(r));
}
subqueue_it.second->data_.clear();
queue.op_events_.clear();
queue.inputs_.clear();
queue.jit_stack_.clear();
queue.jit_modules_.clear();
queue.extra_args_.clear();
queue.gpu_fallback_.clear();
}

std::stable_sort(out.begin(), out.end(), [](const auto& a, const auto& b) {
return a.start_time_.us_ < b.start_time_.us_;
return a.start_time_us_ < b.start_time_us_;
});
return out;
}
Expand Down
Loading

0 comments on commit 2ecf743

Please sign in to comment.