Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROF-10123] Add placeholder for skipped samples in allocation profiler, second try #3792

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ static const rb_data_type_t cpu_and_wall_time_worker_typed_data = {
};

static VALUE _native_new(VALUE klass) {
long now = monotonic_wall_time_now_ns(RAISE_ON_FAILURE);

struct cpu_and_wall_time_worker_state *state = ruby_xcalloc(1, sizeof(struct cpu_and_wall_time_worker_state));

// Note: Any exceptions raised from this note until the TypedData_Wrap_Struct call will lead to the state memory
Expand All @@ -340,13 +342,6 @@ static VALUE _native_new(VALUE klass) {
state->during_sample = false;

reset_stats_not_thread_safe(state);

long now = monotonic_wall_time_now_ns(DO_NOT_RAISE_ON_FAILURE);
if (now == 0) {
ruby_xfree(state);
rb_raise(rb_eRuntimeError, ERR_CLOCK_FAIL);
}

discrete_dynamic_sampler_init(&state->allocation_sampler, "allocation", now);

// Note: As of this writing, no new Ruby objects get created and stored in the state. If that ever changes, remember
Expand Down Expand Up @@ -1144,9 +1139,15 @@ static VALUE rescued_sample_allocation(VALUE tracepoint_data) {
discrete_dynamic_sampler_events_since_last_sample(&state->allocation_sampler) :
// if we aren't, then we're sampling every event
1;
// TODO: Signal in the profile that clamping happened?

// To control bias from sampling, we clamp the maximum weight attributed to a single allocation sample. This avoids
// assigning a very large number to a sample, if for instance the dynamic sampling mechanism chose a really big interval.
unsigned int weight = allocations_since_last_sample > MAX_ALLOC_WEIGHT ? MAX_ALLOC_WEIGHT : (unsigned int) allocations_since_last_sample;
thread_context_collector_sample_allocation(state->thread_context_collector_instance, weight, new_object);
// ...but we still represent the skipped samples in the profile, thus the data will account for all allocations.
if (weight < allocations_since_last_sample) {
thread_context_collector_sample_skipped_allocation_samples(state->thread_context_collector_instance, allocations_since_last_sample - weight);
}

// Return a dummy VALUE because we're called from rb_rescue2 which requires it
return Qnil;
Expand Down
3 changes: 3 additions & 0 deletions ext/datadog_profiling_native_extension/collectors_stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ static VALUE _native_sample(
ENFORCE_TYPE(numeric_labels_array, T_ARRAY);

VALUE zero = INT2NUM(0);
VALUE heap_sample = rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("heap_sample"), Qfalse);
ENFORCE_BOOLEAN(heap_sample);
sample_values values = {
.cpu_time_ns = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("cpu-time"), zero)),
.cpu_or_wall_samples = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("cpu-samples"), zero)),
.wall_time_ns = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("wall-time"), zero)),
.alloc_samples = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("alloc-samples"), zero)),
.alloc_samples_unscaled = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("alloc-samples-unscaled"), zero)),
.timeline_wall_time_ns = NUM2UINT(rb_hash_lookup2(metric_values_hash, rb_str_new_cstr("timeline"), zero)),
.heap_sample = heap_sample == Qtrue,
};

long labels_count = RARRAY_LEN(labels_array) + RARRAY_LEN(numeric_labels_array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ static void ddtrace_otel_trace_identifiers_for(
VALUE active_span,
VALUE otel_values
);
static VALUE _native_sample_skipped_allocation_samples(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE skipped_samples);

void collectors_thread_context_init(VALUE profiling_module) {
VALUE collectors_module = rb_define_module_under(profiling_module, "Collectors");
Expand Down Expand Up @@ -261,6 +262,7 @@ void collectors_thread_context_init(VALUE profiling_module) {
rb_define_singleton_method(testing_module, "_native_stats", _native_stats, 1);
rb_define_singleton_method(testing_module, "_native_gc_tracking", _native_gc_tracking, 1);
rb_define_singleton_method(testing_module, "_native_new_empty_thread", _native_new_empty_thread, 0);
rb_define_singleton_method(testing_module, "_native_sample_skipped_allocation_samples", _native_sample_skipped_allocation_samples, 2);

at_active_span_id = rb_intern_const("@active_span");
at_active_trace_id = rb_intern_const("@active_trace");
Expand Down Expand Up @@ -1301,7 +1303,7 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
/* thread: */ current_thread,
/* stack_from_thread: */ current_thread,
get_or_create_context_for(current_thread, state),
(sample_values) {.alloc_samples = sample_weight, .alloc_samples_unscaled = 1},
(sample_values) {.alloc_samples = sample_weight, .alloc_samples_unscaled = 1, .heap_sample = true},
INVALID_TIME, // For now we're not collecting timestamps for allocation events, as per profiling team internal discussions
&ruby_vm_type,
optional_class_name
Expand Down Expand Up @@ -1411,3 +1413,33 @@ static void ddtrace_otel_trace_identifiers_for(
*active_trace = current_trace;
*numeric_span_id = resolved_numeric_span_id;
}

void thread_context_collector_sample_skipped_allocation_samples(VALUE self_instance, unsigned int skipped_samples) {
struct thread_context_collector_state *state;
TypedData_Get_Struct(self_instance, struct thread_context_collector_state, &thread_context_collector_typed_data, state);

ddog_prof_Label labels[] = {
// Providing .num = 0 should not be needed but the tracer-2.7 docker image ships a buggy gcc that complains about this
{.key = DDOG_CHARSLICE_C("thread id"), .str = DDOG_CHARSLICE_C("SS"), .num = 0},
{.key = DDOG_CHARSLICE_C("thread name"), .str = DDOG_CHARSLICE_C("Skipped Samples"), .num = 0},
{.key = DDOG_CHARSLICE_C("allocation class"), .str = DDOG_CHARSLICE_C("(Skipped Samples)"), .num = 0},
};
ddog_prof_Slice_Label slice_labels = {.ptr = labels, .len = sizeof(labels) / sizeof(labels[0])};

record_placeholder_stack(
state->sampling_buffer,
state->recorder_instance,
(sample_values) {.alloc_samples = skipped_samples},
(sample_labels) {
.labels = slice_labels,
.state_label = NULL,
.end_timestamp_ns = 0, // For now we're not collecting timestamps for allocation events
},
DDOG_CHARSLICE_C("Skipped Samples")
);
}

static VALUE _native_sample_skipped_allocation_samples(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE skipped_samples) {
thread_context_collector_sample_skipped_allocation_samples(collector_instance, NUM2UINT(skipped_samples));
return Qtrue;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ void thread_context_collector_sample(
VALUE profiler_overhead_stack_thread
);
void thread_context_collector_sample_allocation(VALUE self_instance, unsigned int sample_weight, VALUE new_object);
void thread_context_collector_sample_skipped_allocation_samples(VALUE self_instance, unsigned int skipped_samples);
VALUE thread_context_collector_sample_after_gc(VALUE self_instance);
void thread_context_collector_on_gc_start(VALUE self_instance);
bool thread_context_collector_on_gc_finish(VALUE self_instance);
Expand Down
37 changes: 32 additions & 5 deletions ext/datadog_profiling_native_extension/heap_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ struct heap_recorder {
size_t objects_frozen;
} stats_last_update;
};

struct end_heap_allocation_args {
struct heap_recorder *heap_recorder;
ddog_prof_Slice_Location locations;
};

static heap_record* get_or_create_heap_record(heap_recorder*, ddog_prof_Slice_Location);
static void cleanup_heap_record_if_unused(heap_recorder*, heap_record*);
static void on_committed_object_record_cleanup(heap_recorder *heap_recorder, object_record *record);
Expand All @@ -176,6 +182,7 @@ static int st_object_records_iterate(st_data_t, st_data_t, st_data_t);
static int st_object_records_debug(st_data_t key, st_data_t value, st_data_t extra);
static int update_object_record_entry(st_data_t*, st_data_t*, st_data_t, int);
static void commit_recording(heap_recorder*, heap_record*, recording);
static VALUE end_heap_allocation_recording(VALUE end_heap_allocation_args);

// ==========================
// Heap Recorder External API
Expand Down Expand Up @@ -340,9 +347,28 @@ void start_heap_allocation_recording(heap_recorder *heap_recorder, VALUE new_obj
};
}

void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_prof_Slice_Location locations) {
// end_heap_allocation_recording_with_rb_protect gets called while the stack_recorder is holding one of the profile
// locks. To enable us to correctly unlock the profile on exception, we wrap the call to end_heap_allocation_recording
// with an rb_protect.
__attribute__((warn_unused_result))
int end_heap_allocation_recording_with_rb_protect(struct heap_recorder *heap_recorder, ddog_prof_Slice_Location locations) {
int exception_state;
struct end_heap_allocation_args end_heap_allocation_args = {
.heap_recorder = heap_recorder,
.locations = locations,
};
rb_protect(end_heap_allocation_recording, (VALUE) &end_heap_allocation_args, &exception_state);
return exception_state;
}

static VALUE end_heap_allocation_recording(VALUE end_heap_allocation_args) {
struct end_heap_allocation_args *args = (struct end_heap_allocation_args *) end_heap_allocation_args;

struct heap_recorder *heap_recorder = args->heap_recorder;
ddog_prof_Slice_Location locations = args->locations;

if (heap_recorder == NULL) {
return;
return Qnil;
}

recording active_recording = heap_recorder->active_recording;
Expand All @@ -356,15 +382,16 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro
// data required for committing though.
heap_recorder->active_recording = (recording) {0};

if (active_recording.object_record == &SKIPPED_RECORD) {
// special marker when we decided to skip due to sampling
return;
if (active_recording.object_record == &SKIPPED_RECORD) { // special marker when we decided to skip due to sampling
return Qnil;
}

heap_record *heap_record = get_or_create_heap_record(heap_recorder, locations);

// And then commit the new allocation.
commit_recording(heap_recorder, heap_record, active_recording);

return Qnil;
}

void heap_recorder_prepare_iteration(heap_recorder *heap_recorder) {
Expand Down
4 changes: 3 additions & 1 deletion ext/datadog_profiling_native_extension/heap_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ void start_heap_allocation_recording(heap_recorder *heap_recorder, VALUE new_obj
// @param locations The stacktrace representing the location of the allocation.
//
// WARN: It is illegal to call this without previously having called ::start_heap_allocation_recording.
void end_heap_allocation_recording(heap_recorder *heap_recorder, ddog_prof_Slice_Location locations);
// WARN: This method rescues exceptions with `rb_protect`, returning the exception state integer for the caller to handle.
__attribute__((warn_unused_result))
int end_heap_allocation_recording_with_rb_protect(heap_recorder *heap_recorder, ddog_prof_Slice_Location locations);

// Update the heap recorder to reflect the latest state of the VM and prepare internal structures
// for efficient iteration.
Expand Down
12 changes: 10 additions & 2 deletions ext/datadog_profiling_native_extension/stack_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,20 @@ void record_sample(VALUE recorder_instance, ddog_prof_Slice_Location locations,
metric_values[position_for[ALLOC_SAMPLES_UNSCALED_VALUE_ID]] = values.alloc_samples_unscaled;
metric_values[position_for[TIMELINE_VALUE_ID]] = values.timeline_wall_time_ns;

if (values.alloc_samples != 0) {
if (values.heap_sample) {
// If we got an allocation sample end the heap allocation recording to commit the heap sample.
// FIXME: Heap sampling currently has to be done in 2 parts because the construction of locations is happening
// very late in the allocation-sampling path (which is shared with the cpu sampling path). This can
// be fixed with some refactoring but for now this leads to a less impactful change.
end_heap_allocation_recording(state->heap_recorder, locations);
//
// NOTE: The heap recorder is allowed to raise exceptions if something's wrong. But we also need to handle it
// on this side to make sure we properly unlock the active slot mutex on our way out. Otherwise, this would
// later lead to deadlocks (since the active slot mutex is not expected to be locked forever).
int exception_state = end_heap_allocation_recording_with_rb_protect(state->heap_recorder, locations);
if (exception_state) {
sampler_unlock_active_profile(active_slot);
rb_jump_tag(exception_state);
}
}

ddog_prof_Profile_Result result = ddog_prof_Profile_add(
Expand Down
1 change: 1 addition & 0 deletions ext/datadog_profiling_native_extension/stack_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ typedef struct {
uint32_t cpu_or_wall_samples;
uint32_t alloc_samples;
uint32_t alloc_samples_unscaled;
bool heap_sample;
int64_t timeline_wall_time_ns;
} sample_values;

Expand Down
1 change: 1 addition & 0 deletions lib/datadog/profiling/profiler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def component_failed(failed_component)
# we're operating in a degraded state and crash tracking may still be helpful.

if failed_component == :worker
scheduler.mark_profiler_failed
stop_scheduler
elsif failed_component == :scheduler
stop_worker
Expand Down
12 changes: 10 additions & 2 deletions lib/datadog/profiling/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class Scheduler < Core::Worker

attr_reader \
:exporter,
:transport
:transport,
:profiler_failed

public

Expand All @@ -34,6 +35,7 @@ def initialize(
)
@exporter = exporter
@transport = transport
@profiler_failed = false

# Workers::Async::Thread settings
self.fork_policy = fork_policy
Expand Down Expand Up @@ -80,8 +82,14 @@ def loop_wait_before_first_iteration?
true
end

# This is called by the Profiler class whenever an issue happened in the profiler. This makes sure that even
# if there is data to be flushed, we don't try to flush it.
def mark_profiler_failed
@profiler_failed = true
end

def work_pending?
exporter.can_flush?
!profiler_failed && exporter.can_flush?
end

def reset_after_fork
Expand Down
1 change: 1 addition & 0 deletions sig/datadog/profiling/scheduler.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Datadog
def start: (?on_failure_proc: ::Proc?) -> void

def reset_after_fork: () -> void
def mark_profiler_failed: () -> true
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@

context 'with dynamic_sampling_rate_enabled' do
let(:options) { { dynamic_sampling_rate_enabled: true } }

it 'keeps statistics on how allocation sampling is doing' do
stub_const('CpuAndWallTimeWorkerSpec::TestStruct', Struct.new(:foo))

Expand Down Expand Up @@ -536,6 +537,30 @@
one_second_in_ns = 1_000_000_000
expect(sampling_time_ns_max).to be < one_second_in_ns, "A single sample should not take longer than 1s, #{stats}"
end

# When large numbers of objects are allocated, the dynamic sampling rate kicks in, and we don't sample every
# object.
# We then assign a weight to every sample to compensate for this; to avoid bias, we have a limit on this weight,
# and we clamp it if it goes over the limit.
# But the total amount of allocations recorded should match the number we observed, and thus we record the
# remainder above the clamped value as a separate "Skipped Samples" step.
it 'records skipped allocation samples when weights are clamped' do
start

thread_that_allocates_as_fast_as_possible = Thread.new { loop { BasicObject.new } }

allocation_samples = try_wait_until do
samples = samples_from_pprof(recorder.serialize!).select { |it| it.values[:'alloc-samples'] > 0 }
samples if samples.any? { |it| it.labels[:'thread name'] == 'Skipped Samples' }
end

thread_that_allocates_as_fast_as_possible.kill
thread_that_allocates_as_fast_as_possible.join

cpu_and_wall_time_worker.stop

expect(allocation_samples).to_not be_empty
end
end

context 'when sampling optimized Ruby strings' do
Expand Down Expand Up @@ -629,6 +654,7 @@
let(:options) { { dynamic_sampling_rate_enabled: false } }

before do
skip 'Heap profiling is only supported on Ruby >= 2.7' if RUBY_VERSION < '2.7'
allow(Datadog.logger).to receive(:warn)
expect(Datadog.logger).to receive(:warn).with(/dynamic sampling rate disabled/)
end
Expand Down
29 changes: 29 additions & 0 deletions spec/datadog/profiling/collectors/thread_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def sample_allocation(weight:, new_object: Object.new)
described_class::Testing._native_sample_allocation(cpu_and_wall_time_collector, weight, new_object)
end

def sample_skipped_allocation_samples(skipped_samples)
described_class::Testing._native_sample_skipped_allocation_samples(cpu_and_wall_time_collector, skipped_samples)
end

def thread_list
described_class::Testing._native_thread_list
end
Expand Down Expand Up @@ -1216,6 +1220,31 @@ def self.otel_sdk_available?
end
end

describe '#sample_skipped_allocation_samples' do
let(:single_sample) do
expect(samples.size).to be 1
samples.first
end
before { sample_skipped_allocation_samples(123) }

it 'records the number of skipped allocations' do
expect(single_sample.values).to include('alloc-samples': 123)
end

it 'attributes the skipped samples to a "Skipped Samples" thread' do
expect(single_sample.labels).to include('thread id': 'SS', 'thread name': 'Skipped Samples')
end

it 'attributes the skipped samples to a "(Skipped Samples)" allocation class' do
expect(single_sample.labels).to include('allocation class': '(Skipped Samples)')
end

it 'includes a placeholder stack attributed to "Skipped Samples"' do
expect(single_sample.locations.size).to be 1
expect(single_sample.locations.first.path).to eq 'Skipped Samples'
end
end

describe '#thread_list' do
it "returns the same as Ruby's Thread.list" do
expect(thread_list).to eq Thread.list
Expand Down
7 changes: 7 additions & 0 deletions spec/datadog/profiling/profiler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
before do
allow(scheduler).to receive(:enabled=)
allow(scheduler).to receive(:stop)
allow(scheduler).to receive(:mark_profiler_failed)
end

it 'logs the issue' do
Expand All @@ -131,6 +132,12 @@
worker_on_failure
end

it 'marks the profiler as having failed in the scheduler' do
expect(scheduler).to receive(:mark_profiler_failed)

worker_on_failure
end

it 'stops the scheduler' do
expect(scheduler).to receive(:enabled=).with(false)
expect(scheduler).to receive(:stop).with(true)
Expand Down
Loading
Loading