Skip to content

Commit

Permalink
add new self metric
Browse files Browse the repository at this point in the history
Signed-off-by: anthonyhui <[email protected]>
  • Loading branch information
anthonyhui committed Aug 8, 2023
1 parent dca173a commit d2674b4
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 56 deletions.
13 changes: 11 additions & 2 deletions collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ char* startAttachAgent(int pid);
char* stopAttachAgent(int pid);
void startProfileDebug(int pid, int tid);
void stopProfileDebug();
void getCaptureStatistics();
void getCaptureStatistics(struct capture_statistics_for_go* stats);
void catchSignalUp();
#ifdef __cplusplus
}

#endif

#endif //SYSDIG_CGO_FUNC_H

struct capture_statistics_for_go{
int evts;
int drops;
int drops_buffer;
int drops_pf;
int drops_bug;
int preemptions;
int suppressed;
int tids_suppressed;
};
struct event_params_for_subscribe {
char *name;
char *value;
Expand Down
9 changes: 6 additions & 3 deletions collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type CKindlingEventForGo C.struct_kindling_event_t_for_go

type CEventParamsForSubscribe C.struct_event_params_for_subscribe

type CaptureStatistic C.struct_capture_statistics_for_go

type CgoReceiver struct {
cfg *Config
analyzerManager *analyzerpackage.Manager
Expand Down Expand Up @@ -65,7 +67,6 @@ func (r *CgoReceiver) Start() error {
if res == 1 {
return fmt.Errorf("fail to init probe")
}
go r.getCaptureStatistics()
go r.catchSignalUp()
time.Sleep(2 * time.Second)
r.suppressEventsComm()
Expand Down Expand Up @@ -239,8 +240,10 @@ func (r *CgoReceiver) ProfileModule() (submodule string, start func() error, sto
return "cgoreceiver", r.StartProfile, r.StopProfile
}

func (r *CgoReceiver) getCaptureStatistics() {
C.getCaptureStatistics()
func (r *CgoReceiver) getCaptureStatistics() CaptureStatistic {
var captureStatistics C.struct_capture_statistics_for_go
C.getCaptureStatistics((*C.struct_capture_statistics_for_go)(&captureStatistics))
return CaptureStatistic(captureStatistics)
}

func (r *CgoReceiver) catchSignalUp() {
Expand Down
14 changes: 14 additions & 0 deletions collector/pkg/component/receiver/cgoreceiver/self_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var once sync.Once
const (
eventReceivedMetric = "kindling_telemetry_cgoreceiver_events_total"
channelSizeMetric = "kindling_telemetry_cgoreceiver_channel_size"
eventStatMetric = "kindling_telemetry_cgoreceiver_events"
)

func newSelfMetrics(meterProvider metric.MeterProvider, receiver *CgoReceiver) {
Expand All @@ -32,6 +33,19 @@ func newSelfMetrics(meterProvider metric.MeterProvider, receiver *CgoReceiver) {
func(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(int64(len(receiver.eventChannel)))
}, metric.WithDescription("The current number of events contained in the channel. The maximum size is 300,000."))
meter.NewInt64GaugeObserver(eventStatMetric,
func(ctx context.Context, result metric.Int64ObserverResult) {
stat := receiver.getCaptureStatistics()
result.Observe(int64(stat.evts), attribute.String("label", "evts"))
result.Observe(int64(stat.drops), attribute.String("label", "drops"))
result.Observe(int64(stat.drops_buffer), attribute.String("label", "drops_buffer"))
result.Observe(int64(stat.drops_pf), attribute.String("label", "drops_pf"))
result.Observe(int64(stat.drops_bug), attribute.String("label", "drops_bug"))
result.Observe(int64(stat.preemptions), attribute.String("label", "preemptions"))
result.Observe(int64(stat.suppressed), attribute.String("label", "suppressed"))
result.Observe(int64(stat.tids_suppressed), attribute.String("label", "tids_suppressed"))
}, metric.WithDescription("The events stat"))

})
}

Expand Down
5 changes: 3 additions & 2 deletions probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ void subEventForGo(char* eventName, char* category, void *params) { sub_event(ev
void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }
void stopProfileDebug() { stop_profile_debug(); }

void getCaptureStatistics() { get_capture_statistics(); }
void catchSignalUp() { sig_set_up(); }
void getCaptureStatistics(struct capture_statistics_for_go* stats) { get_capture_statistics(stats); }
void catchSignalUp() { sig_set_up(); }

4 changes: 2 additions & 2 deletions probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ char* startAttachAgent(int pid);
char* stopAttachAgent(int pid);
void startProfileDebug(int pid, int tid);
void stopProfileDebug();
void getCaptureStatistics();
void getCaptureStatistics(struct capture_statistics_for_go* stats);
void catchSignalUp();
#ifdef __cplusplus
}
#endif

#endif // SYSDIG_CGO_FUNC_H
#endif // SYSDIG_CGO_FUNC_H
84 changes: 38 additions & 46 deletions probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include <fstream>
#include <iostream>
#include <thread>
#include "converter/cpu_converter.h"
#include "scap_open_exception.h"
#include "sinsp_capture_interrupt_exception.h"
#include "utils.h"
#include "converter/cpu_converter.h"

cpu_converter* cpuConverter;
fstream debug_file_log;
Expand Down Expand Up @@ -307,15 +307,20 @@ int getEvent(void** pp_kindling_event) {
case SCAP_FD_IPV6_SOCK:
p_kindling_event->context.fdInfo.protocol = get_protocol(fdInfo->get_l4proto());
p_kindling_event->context.fdInfo.role = fdInfo->is_role_server();
memcpy(p_kindling_event->context.fdInfo.sip, fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b));
memcpy(p_kindling_event->context.fdInfo.sip,
fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip,
fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dip.m_b));
p_kindling_event->context.fdInfo.sport = fdInfo->m_sockinfo.m_ipv6info.m_fields.m_sport;
p_kindling_event->context.fdInfo.dport = fdInfo->m_sockinfo.m_ipv6info.m_fields.m_dport;
break;
case SCAP_FD_IPV6_SERVSOCK:
p_kindling_event->context.fdInfo.protocol = get_protocol(fdInfo->get_l4proto());
p_kindling_event->context.fdInfo.role = fdInfo->is_role_server();
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b, sizeof(fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b));
memcpy(p_kindling_event->context.fdInfo.dip, fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b,
sizeof(fdInfo->m_sockinfo.m_ipv6serverinfo.m_ip.m_b));
p_kindling_event->context.fdInfo.dport = fdInfo->m_sockinfo.m_ipv6serverinfo.m_port;
break;
case SCAP_FD_UNIX_SOCK:
Expand Down Expand Up @@ -388,15 +393,14 @@ int getEvent(void** pp_kindling_event) {
userAttNumber = setTuple(p_kindling_event, pTuple, userAttNumber);
break;
}
case PPME_TCP_RETRANCESMIT_SKB_E:{
case PPME_TCP_RETRANCESMIT_SKB_E: {
auto pTuple = ev->get_param_value_raw("tuple");
userAttNumber = setTuple(p_kindling_event, pTuple, userAttNumber);

auto segs = ev->get_param_value_raw("segs");
if (segs != NULL){
if (segs != NULL) {
strcpy(p_kindling_event->userAttributes[userAttNumber].key, "segs");
memcpy(p_kindling_event->userAttributes[userAttNumber].value, segs->m_val,
segs->m_len);
memcpy(p_kindling_event->userAttributes[userAttNumber].value, segs->m_val, segs->m_len);
p_kindling_event->userAttributes[userAttNumber].len = segs->m_len;
p_kindling_event->userAttributes[userAttNumber].valueType = INT32;
userAttNumber++;
Expand Down Expand Up @@ -1044,7 +1048,7 @@ void attach_agent(int64_t pid, char* error_message, bool is_attach) {
}

char* start_attach_agent(int64_t pid) {
char* error_message = (char*) malloc(1024 * sizeof(char));
char* error_message = (char*)malloc(1024 * sizeof(char));
error_message[0] = '\0';
if (!inspector) {
strcpy(error_message, "Please start profile first");
Expand All @@ -1055,7 +1059,7 @@ char* start_attach_agent(int64_t pid) {
}

char* stop_attach_agent(int64_t pid) {
char* error_message = (char*) malloc(1024 * sizeof(char));
char* error_message = (char*)malloc(1024 * sizeof(char));
error_message[0] = '\0';
if (!inspector) {
strcpy(error_message, "Please start profile first");
Expand Down Expand Up @@ -1113,40 +1117,28 @@ void print_profile_debug_info(sinsp_evt* sevt) {
}
}

void get_capture_statistics() {
void get_capture_statistics(struct capture_statistics_for_go* stats) {
scap_stats s;
while (1) {
printCurrentTime();
inspector->get_capture_stats(&s);
printf("seen by driver: %" PRIu64 "\n", s.n_evts);
if (s.n_drops != 0) {
printf("Number of dropped events: %" PRIu64 "\n", s.n_drops);
}
if (s.n_drops_buffer != 0) {
printf("Number of dropped events caused by full buffer: %" PRIu64 "\n", s.n_drops_buffer);
}
if (s.n_drops_pf != 0) {
printf("Number of dropped events caused by invalid memory access: %" PRIu64 "\n",
s.n_drops_pf);
}
if (s.n_drops_bug != 0) {
printf(
"Number of dropped events caused by an invalid condition in the kernel instrumentation: "
"%" PRIu64 "\n",
s.n_drops_bug);
}
if (s.n_preemptions != 0) {
printf("Number of preemptions: %" PRIu64 "\n", s.n_preemptions);
}
if (s.n_suppressed != 0) {
printf("Number of events skipped due to the tid being in a set of suppressed tids: %" PRIu64
"\n",
s.n_suppressed);
}
if (s.n_tids_suppressed != 0) {
printf("Number of threads currently being suppressed: %" PRIu64 "\n", s.n_tids_suppressed);
}
fflush(stdout);
sleep(10);
}
}
printCurrentTime();
inspector->get_capture_stats(&s);
stats->evts = s.n_evts;
printf("seen by driver: %d \n", stats->evts);
stats->drops = s.n_drops;
printf("Number of dropped events: %d \n", stats->drops);
stats->drops_buffer = s.n_drops_buffer;
printf("Number of dropped events caused by full buffer: %d \n", stats->drops_buffer);
stats->drops_pf = s.n_drops_pf;
printf("Number of dropped events caused by invalid memory access: %d \n",
stats->drops_pf);
stats->drops_bug = s.n_drops_bug;
printf(
"Number of dropped events caused by an invalid condition in the kernel instrumentation: %d \n", stats->drops_bug);
stats->preemptions = s.n_preemptions;
printf("Number of preemptions: %d \n", stats->preemptions);
stats->suppressed = s.n_suppressed;
printf("Number of events skipped due to the tid being in a set of suppressed tids: %d \n",
stats->suppressed);
stats->tids_suppressed = s.n_tids_suppressed;
printf("Number of threads currently being suppressed: %d \n", stats->tids_suppressed);
fflush(stdout);
}
13 changes: 12 additions & 1 deletion probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ char* stop_attach_agent(int64_t pid);

void attach_agent(int64_t pid, char* error_message, bool is_attach);

void get_capture_statistics();
void get_capture_statistics(struct capture_statistics_for_go* stats);

uint16_t get_protocol(scap_l4_proto proto);
uint16_t get_type(ppm_param_type type);
uint16_t get_kindling_source(uint16_t etype);


void suppress_events_comm(string comm);

struct event {
Expand All @@ -64,6 +65,16 @@ struct event_params_for_subscribe {
char *name;
char *value;
};
struct capture_statistics_for_go{
int evts;
int drops;
int drops_buffer;
int drops_pf;
int drops_bug;
int preemptions;
int suppressed;
int tids_suppressed;
};
void sub_event(char* eventName, char* category, event_params_for_subscribe params[]);
struct kindling_event_t_for_go {
uint64_t timestamp;
Expand Down

0 comments on commit d2674b4

Please sign in to comment.