Skip to content

Commit

Permalink
Add the self-observability metric of probe events (#553)
Browse files Browse the repository at this point in the history
Signed-off-by: anthonyhui <[email protected]>
  • Loading branch information
hwz779866221 authored Aug 16, 2023
1 parent 3f8adcc commit 6c8fa9f
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
## Unreleased
### Bug fixes
- Fix the bug where sending repetitive k8s_info_workload. Now each node only sends its own info.([#554](https://github.com/KindlingProject/kindling/pull/554)
- Provide a new self metric for probe events. (skipped events/dropped events)([#553](https://github.com/KindlingProject/kindling/pull/553))
## v0.8.0 - 2023-06-30
### New features
- Provide a new metric called kindling_k8s_workload_info, which supports workload filtering for k8s, thus preventing frequent crashes of Grafana topology. Please refer to the [doc](http://kindling.harmonycloud.cn/docs/usage/grafana-topology-plugin/) for any limitations.([#530](https://github.com/KindlingProject/kindling/pull/530)
Expand Down
13 changes: 11 additions & 2 deletions collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ char* startAttachAgent(int pid);
char* stopAttachAgent(int pid);
void startProfileDebug(int pid, int tid);
void stopProfileDebug();
void getCaptureStatistics();
void getCaptureStatistics(struct capture_statistics_for_go* stats);
void catchSignalUp();
#ifdef __cplusplus
}

#endif

#endif //SYSDIG_CGO_FUNC_H

struct capture_statistics_for_go{
int evts;
int drops;
int drops_buffer;
int drops_pf;
int drops_bug;
int preemptions;
int suppressed;
int tids_suppressed;
};
struct event_params_for_subscribe {
char *name;
char *value;
Expand Down
44 changes: 35 additions & 9 deletions collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ type CKindlingEventForGo C.struct_kindling_event_t_for_go
type CEventParamsForSubscribe C.struct_event_params_for_subscribe

type CgoReceiver struct {
cfg *Config
analyzerManager *analyzerpackage.Manager
shutdownWG sync.WaitGroup
telemetry *component.TelemetryTools
eventChannel chan *model.KindlingEvent
stopCh chan interface{}
stats eventCounter
cfg *Config
analyzerManager *analyzerpackage.Manager
shutdownWG sync.WaitGroup
telemetry *component.TelemetryTools
eventChannel chan *model.KindlingEvent
stopCh chan interface{}
stats eventCounter
probeCounter *probeCounter
probeCounterMutex sync.RWMutex
}

func NewCgoReceiver(config interface{}, telemetry *component.TelemetryTools, analyzerManager *analyzerpackage.Manager) receiver.Receiver {
Expand All @@ -53,6 +55,7 @@ func NewCgoReceiver(config interface{}, telemetry *component.TelemetryTools, ana
telemetry: telemetry,
eventChannel: make(chan *model.KindlingEvent, 3e5),
stopCh: make(chan interface{}, 1),
probeCounter: &probeCounter{},
}
cgoReceiver.stats = newDynamicStats(cfg.SubscribeInfo)
newSelfMetrics(telemetry.MeterProvider, cgoReceiver)
Expand All @@ -65,7 +68,6 @@ func (r *CgoReceiver) Start() error {
if res == 1 {
return fmt.Errorf("fail to init probe")
}
go r.getCaptureStatistics()
go r.catchSignalUp()
time.Sleep(2 * time.Second)
r.suppressEventsComm()
Expand All @@ -74,6 +76,7 @@ func (r *CgoReceiver) Start() error {
time.Sleep(2 * time.Second)
go r.consumeEvents()
go r.startGetEvent()
go r.getCaptureStatisticsByInterval(15 * time.Second)
return nil
}

Expand Down Expand Up @@ -239,8 +242,31 @@ func (r *CgoReceiver) ProfileModule() (submodule string, start func() error, sto
return "cgoreceiver", r.StartProfile, r.StopProfile
}

func (r *CgoReceiver) getCaptureStatisticsByInterval(interval time.Duration) {
timer := time.NewTicker(interval)
for {
select {
case <-timer.C:
r.getCaptureStatistics()
case <-r.stopCh:
return
}
}
}

func (r *CgoReceiver) getCaptureStatistics() {
C.getCaptureStatistics()
var captureStatistics C.struct_capture_statistics_for_go
C.getCaptureStatistics((*C.struct_capture_statistics_for_go)(&captureStatistics))
r.probeCounterMutex.Lock()
defer r.probeCounterMutex.Unlock()
r.probeCounter.evts = int64(captureStatistics.evts)
r.probeCounter.drops = int64(captureStatistics.drops)
r.probeCounter.dropsBuffer = int64(captureStatistics.drops_buffer)
r.probeCounter.dropsPf = int64(captureStatistics.drops_pf)
r.probeCounter.dropsBug = int64(captureStatistics.drops_bug)
r.probeCounter.preemptions = int64(captureStatistics.preemptions)
r.probeCounter.suppressed = int64(captureStatistics.suppressed)
r.probeCounter.tidsSuppressed = int64(captureStatistics.tids_suppressed)
}

func (r *CgoReceiver) catchSignalUp() {
Expand Down
54 changes: 52 additions & 2 deletions collector/pkg/component/receiver/cgoreceiver/self_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import (
var once sync.Once

const (
eventReceivedMetric = "kindling_telemetry_cgoreceiver_events_total"
channelSizeMetric = "kindling_telemetry_cgoreceiver_channel_size"
eventReceivedMetric = "kindling_telemetry_cgoreceiver_events_total"
channelSizeMetric = "kindling_telemetry_cgoreceiver_channel_size"
probeEventMetric = "kindling_telemetry_cgoreceiver_probe_event_total"
dropProbeEventMetric = "kindling_telemetry_cgoreceiver_dropped_probe_event_total"
preemptionsMetric = "kindling_telemetry_cgoreceiver_preemptions_total"
skippedEventMetric = "kindling_telemetry_cgoreceiver_skipped_events_total"
suppressedThreadMetric = "kindling_telemetry_cgoreceiver_suppressed_thread_total"
)

func newSelfMetrics(meterProvider metric.MeterProvider, receiver *CgoReceiver) {
Expand All @@ -32,6 +37,40 @@ func newSelfMetrics(meterProvider metric.MeterProvider, receiver *CgoReceiver) {
func(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(int64(len(receiver.eventChannel)))
}, metric.WithDescription("The current number of events contained in the channel. The maximum size is 300,000."))
meter.NewInt64CounterObserver(probeEventMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
receiver.probeCounterMutex.RLock()
result.Observe(receiver.probeCounter.evts)
receiver.probeCounterMutex.RUnlock()
}, metric.WithDescription("The events seen by driver"))
meter.NewInt64CounterObserver(dropProbeEventMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
receiver.probeCounterMutex.RLock()
result.Observe(receiver.probeCounter.dropsBuffer, attribute.String("reason", "full buffer"))
result.Observe(receiver.probeCounter.dropsPf, attribute.String("reason", "invalid memory access"))
result.Observe(receiver.probeCounter.dropsBug, attribute.String("reason", "invalid condition"))
result.Observe(receiver.probeCounter.drops-receiver.probeCounter.dropsBuffer-receiver.probeCounter.dropsPf-receiver.probeCounter.dropsBug, attribute.String("reason", "others"))
receiver.probeCounterMutex.RUnlock()
}, metric.WithDescription("The dropped events"))
meter.NewInt64CounterObserver(preemptionsMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
receiver.probeCounterMutex.RLock()
result.Observe(receiver.probeCounter.preemptions)
receiver.probeCounterMutex.RUnlock()
}, metric.WithDescription("The preemptions"))
meter.NewInt64CounterObserver(skippedEventMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
receiver.probeCounterMutex.RLock()
result.Observe(receiver.probeCounter.suppressed)
receiver.probeCounterMutex.RUnlock()
}, metric.WithDescription("Number of events skipped due to the tid being in a set of suppressed tids"))
meter.NewInt64CounterObserver(suppressedThreadMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
receiver.probeCounterMutex.RLock()
result.Observe(receiver.probeCounter.tidsSuppressed)
receiver.probeCounterMutex.RUnlock()
}, metric.WithDescription("Number of threads currently being suppressed"))

})
}

Expand All @@ -44,6 +83,17 @@ type atomicInt64Counter struct {
v int64
}

type probeCounter struct {
evts int64
drops int64
dropsBuffer int64
dropsPf int64
dropsBug int64
preemptions int64
suppressed int64
tidsSuppressed int64
}

func (c *atomicInt64Counter) add(value int64) {
atomic.AddInt64(&c.v, value)
}
Expand Down
5 changes: 3 additions & 2 deletions probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ void subEventForGo(char* eventName, char* category, void *params) { sub_event(ev
void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }
void stopProfileDebug() { stop_profile_debug(); }

void getCaptureStatistics() { get_capture_statistics(); }
void catchSignalUp() { sig_set_up(); }
void getCaptureStatistics(struct capture_statistics_for_go* stats) { get_capture_statistics(stats); }
void catchSignalUp() { sig_set_up(); }

4 changes: 2 additions & 2 deletions probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ char* startAttachAgent(int pid);
char* stopAttachAgent(int pid);
void startProfileDebug(int pid, int tid);
void stopProfileDebug();
void getCaptureStatistics();
void getCaptureStatistics(struct capture_statistics_for_go* stats);
void catchSignalUp();
#ifdef __cplusplus
}
#endif

#endif // SYSDIG_CGO_FUNC_H
#endif // SYSDIG_CGO_FUNC_H
84 changes: 38 additions & 46 deletions probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include <fstream>
#include <iostream>
#include <thread>
#include "converter/cpu_converter.h"
#include "scap_open_exception.h"
#include "sinsp_capture_interrupt_exception.h"
#include "utils.h"
#include "converter/cpu_converter.h"

cpu_converter* cpuConverter;
fstream debug_file_log;
Expand Down Expand Up @@ -307,15 +307,20 @@ int getEvent(void** pp_kindling_event) {
case SCAP_FD_IPV6_SOCK:
p_kindling_event->context.fdInfo.protocol = get_protocol(fdInfo->get_l4proto());
p_kindling_event->context.fdInfo.role = fdInfo->is_role_server();
memcpy(p_kindling_event->context.fdInfo.sip, fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b));
memcpy(p_kindling_event->context.fdInfo.sip,
fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip,
fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b));
p_kindling_event->context.fdInfo.sport = fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sport;
p_kindling_event->context.fdInfo.dport = fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dport;
break;
case SCAP_FD_IPV6_SERVSOCK:
p_kindling_event->context.fdInfo.protocol = get_protocol(fdInfo->get_l4proto());
p_kindling_event->context.fdInfo.role = fdInfo->is_role_server();
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b));
p_kindling_event->context.fdInfo.dport = fdInfo->m_sockinfo.m_ipv6serverinfo.m_port;
break;
case SCAP_FD_UNIX_SOCK:
Expand Down Expand Up @@ -388,15 +393,14 @@ int getEvent(void** pp_kindling_event) {
userAttNumber = setTuple(p_kindling_event, pTuple, userAttNumber);
break;
}
case PPME_TCP_RETRANCESMIT_SKB_E:{
case PPME_TCP_RETRANCESMIT_SKB_E: {
auto pTuple = ev->get_param_value_raw("tuple");
userAttNumber = setTuple(p_kindling_event, pTuple, userAttNumber);

auto segs = ev->get_param_value_raw("segs");
if (segs != NULL){
if (segs != NULL) {
strcpy(p_kindling_event->userAttributes[userAttNumber].key, "segs");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, segs->m_val,
segs->m_len);
memcpy(p_kindling_event->userAttributes[userAttNumber].value, segs->m_val, segs->m_len);
p_kindling_event->userAttributes[userAttNumber].len = segs->m_len;
p_kindling_event->userAttributes[userAttNumber].valueType = INT32;
userAttNumber++;
Expand Down Expand Up @@ -1044,7 +1048,7 @@ void attach_agent(int64_t pid, char* error_message, bool is_attach) {
}

char* start_attach_agent(int64_t pid) {
char* error_message = (char*) malloc(1024 * sizeof(char));
char* error_message = (char*)malloc(1024 * sizeof(char));
error_message[0] = '\0';
if (!inspector) {
strcpy(error_message, "Please start profile first");
Expand All @@ -1055,7 +1059,7 @@ char* start_attach_agent(int64_t pid) {
}

char* stop_attach_agent(int64_t pid) {
char* error_message = (char*) malloc(1024 * sizeof(char));
char* error_message = (char*)malloc(1024 * sizeof(char));
error_message[0] = '\0';
if (!inspector) {
strcpy(error_message, "Please start profile first");
Expand Down Expand Up @@ -1113,40 +1117,28 @@ void print_profile_debug_info(sinsp_evt* sevt) {
}
}

void get_capture_statistics() {
void get_capture_statistics(struct capture_statistics_for_go* stats) {
scap_stats s;
while (1) {
printCurrentTime();
inspector->get_capture_stats(&s);
printf("seen by driver: %" PRIu64 "\n", s.n_evts);
if (s.n_drops != 0) {
printf("Number of dropped events: %" PRIu64 "\n", s.n_drops);
}
if (s.n_drops_buffer != 0) {
printf("Number of dropped events caused by full buffer: %" PRIu64 "\n", s.n_drops_buffer);
}
if (s.n_drops_pf != 0) {
printf("Number of dropped events caused by invalid memory access: %" PRIu64 "\n",
s.n_drops_pf);
}
if (s.n_drops_bug != 0) {
printf(
"Number of dropped events caused by an invalid condition in the kernel instrumentation: "
"%" PRIu64 "\n",
s.n_drops_bug);
}
if (s.n_preemptions != 0) {
printf("Number of preemptions: %" PRIu64 "\n", s.n_preemptions);
}
if (s.n_suppressed != 0) {
printf("Number of events skipped due to the tid being in a set of suppressed tids: %" PRIu64
"\n",
s.n_suppressed);
}
if (s.n_tids_suppressed != 0) {
printf("Number of threads currently being suppressed: %" PRIu64 "\n", s.n_tids_suppressed);
}
fflush(stdout);
sleep(10);
}
}
printCurrentTime();
inspector->get_capture_stats(&s);
stats->evts = s.n_evts;
printf("seen by driver: %d \n", stats->evts);
stats->drops = s.n_drops;
printf("Number of dropped events: %d \n", stats->drops);
stats->drops_buffer = s.n_drops_buffer;
printf("Number of dropped events caused by full buffer: %d \n", stats->drops_buffer);
stats->drops_pf = s.n_drops_pf;
printf("Number of dropped events caused by invalid memory access: %d \n",
stats->drops_pf);
stats->drops_bug = s.n_drops_bug;
printf(
"Number of dropped events caused by an invalid condition in the kernel instrumentation: %d \n", stats->drops_bug);
stats->preemptions = s.n_preemptions;
printf("Number of preemptions: %d \n", stats->preemptions);
stats->suppressed = s.n_suppressed;
printf("Number of events skipped due to the tid being in a set of suppressed tids: %d \n",
stats->suppressed);
stats->tids_suppressed = s.n_tids_suppressed;
printf("Number of threads currently being suppressed: %d \n", stats->tids_suppressed);
fflush(stdout);
}
13 changes: 12 additions & 1 deletion probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ char* stop_attach_agent(int64_t pid);

void attach_agent(int64_t pid, char* error_message, bool is_attach);

void get_capture_statistics();
void get_capture_statistics(struct capture_statistics_for_go* stats);

uint16_t get_protocol(scap_l4_proto proto);
uint16_t get_type(ppm_param_type type);
uint16_t get_kindling_source(uint16_t etype);


void suppress_events_comm(string comm);

struct event {
Expand All @@ -64,6 +65,16 @@ struct event_params_for_subscribe {
char *name;
char *value;
};
struct capture_statistics_for_go{
int evts;
int drops;
int drops_buffer;
int drops_pf;
int drops_bug;
int preemptions;
int suppressed;
int tids_suppressed;
};
void sub_event(char* eventName, char* category, event_params_for_subscribe params[]);
struct kindling_event_t_for_go {
uint64_t timestamp;
Expand Down

0 comments on commit 6c8fa9f

Please sign in to comment.