diff --git a/docs/proposal/tcp_long_connection_metrics.md b/docs/proposal/tcp_long_connection_metrics.md new file mode 100644 index 000000000..770e4b3a0 --- /dev/null +++ b/docs/proposal/tcp_long_connection_metrics.md @@ -0,0 +1,514 @@ +--- +title: Proposal for generating metrics for TCP long connections +authors: + - "yp969803" +reviewers: +- "nglwcy" +- "lizhencheng" +approvers: +- "nlgwcy" +- "lizhencheng" + +creation-date: 2025-02-06 +--- + + +## Proposal for generating metrics for TCP long connections + + + +Upstream issue: https://github.com/kmesh-net/kmesh/issues/1211 + +### Summary + + + +Currently kmesh provides access logs during termination and establisment of a TCP connection with more detailed information about the connection. + +Kmesh also provides metrics during connection establishment, completion and deny apturing a variety of details about the connection. + +In this proposal, we are aiming to implement access logs and metrics for TCP long connections, developing a continous monitoring and reporting mechanisms that captures detailed, real-time data throughout the lifetime of long-lived TCP connections. + +### Motivation + + + +Perfomance and heath of the long connections can be known early, currently we get all the information of the connection by the metrics and access logs provided at the end after the connection termination. + +#### Goals + + + +- Collect detailed traffic metrics (e.g. bytes send/recieved, round-trip time, packet loss, tcp retransmission) continously during the lifetime of long TCP connections using ebpf. + +- Reporting of metrics and access logs, at periodic time of 5 seconds. We are chosing 5 seconds as a threshold time because, it allows enough time to accumulate meaningful changes in metrics. If the reporting interval is too short, it might cause excessive overhead by processing too many updates. + +- Generation Access logs containing information about connection continously during the lifetime of long TCP connections from the metrics data. + +- Metrics and logs supporting open-telemetry format. + +- Exposing these metrics by kmesh daemon so that prometheus can scrape it. + +- Unit and E2E tests. + +#### Non-Goals + + + +- Collecting information about packet contents. + +- Controlling or modifing TCP connection + +- Collecting L7 metrics + +### Proposal + + + +Metrics will be collected using eBPF tracepoint hooks, and a eBPF map will be used to transfer metrics from kernel space to userspace. + + +### Design Details + + + +#### Collecting Metrics + +We will use eBPF tracepoint hooks to collect different metrics related to connection. +We are using tracepoint above kprobe because: +- More stable +- Lower overhead +- Reliablity in production + +Decelearing ebpf hash map in bpf_common.h to store information about tcp_long_connection. + +``` + +struct ipv4_addr { + __u32 addr; +}; + +struct ipv6_addr { + __u8 addr[16]; +}; + + +struct connection_key { + union { + struct ipv4_addr v4; + struct ipv6_addr v6; + } saddr; + union { + struct ipv4_addr v4; + struct ipv6_addr v6; + } daddr; + __u16 sport; + __u16 dport; + __u8 family; // AF_INET or AF_INET6 +}; + + +struct long_tcp_metrics { + __u64 start_ns; // Timestamp when connection was established + __u64 prev_monitor_ns; // Timestamp when metrics was last reported to ring_buf + __u64 bytes_sent; // Total bytes sent + __u64 bytes_recv; // Total bytes received + __u64 retransmissions; // Total retransmissions + __u64 packet_loss; // Total packet-loss + __u64 srtt_us; // smoothed round-trip time in microseconds +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __uint(max_entries, 10240); + __type(key, struct connection_key); + __type(value, struct long_tcp_metrics); +} long_conn_metrics_map SEC(".maps"); + +``` + +Using various ebpf tracepoints hooks to collects metrics of tcp_long_collection, a ring buffer is also decleared to send data from kernel space to userspace + +Code update in tracepoint.c file +``` +--- + +#define LONG_CONN_THRESHOLD_TIME (5 * 1000000000ULL) +#include "bpf_common.h" +#include +#include +#include +#include +--- + + +// Event structure to send metrics to user space. +struct event { + struct connection_key key; + struct long_tcp_metrics metrics; +}; + +// BPF ring buffer to output events to user space. +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 1 << 24); // 16 MB ring buffer +} tcp_long_conn_events SEC(".maps"); + + +SEC("tracepoint/tcp/tcp_set_state") +int trace_tcp_set_state(struct trace_event_raw_tcp_set_state *ctx) +{ + struct connection_key key = {}; + u64 now = bpf_ktime_get_ns(); + + if (ctx->family == AF_INET) { + key.family = AF_INET; + key.saddr.v4.addr = ctx->saddr; + key.daddr.v4.addr = ctx->daddr; + } else if (ctx->family == AF_INET6) { + key.family = AF_INET6; + bpf_probe_read(&key.saddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_rcv_saddr); + bpf_probe_read(&key.daddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_daddr); + } else { + return 0; + } + + key.sport = ctx->sport; + key.dport = ctx->dport; + + if (ctx->newstate == TCP_ESTABLISHED) { + struct long_tcp_metrics m = {}; + m.start_ns = now; + m.prev_monitor_ns=now; + bpf_map_update_elem(&conn_metrics_map, &key, &m, BPF_ANY); + } else if (ctx->newstate == TCP_CLOSE) { + bpf_map_delete_elem(&conn_metrics_map, &key); + } + return 0; +} + +// Captures bytes send +SEC("tracepoint/tcp/tcp_sendmsg") +int trace_tcp_sendmsg(struct trace_event_raw_tcp_sendmsg *ctx) +{ + struct connection_key key = {}; + u64 bytes = ctx->size; + + if (ctx->family == AF_INET) { + key.family = AF_INET; + key.saddr.v4.addr = ctx->saddr; + key.daddr.v4.addr = ctx->daddr; + } else if (ctx->family == AF_INET6) { + key.family = AF_INET6; + bpf_probe_read(&key.saddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_rcv_saddr); + bpf_probe_read(&key.daddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_daddr); + } else { + return 0; + } + + key.sport = ctx->sport; + key.dport = ctx->dport; + + struct long_tcp_metrics *m = bpf_map_lookup_elem(&conn_metrics_map, &key); + if (m) { + __sync_fetch_and_add(&m->bytes_sent, bytes); + } + return 0; +} + +// Captures bytes recieved +SEC("tracepoint/tcp/tcp_cleanup_rbuf") +int trace_tcp_cleanup_rbuf(struct trace_event_raw_tcp_cleanup_rbuf *ctx) +{ + struct connection_key key = {}; + u64 bytes = ctx->copied; + + if (ctx->family == AF_INET) { + key.family = AF_INET; + key.saddr.v4.addr = ctx->saddr; + key.daddr.v4.addr = ctx->daddr; + } else if (ctx->family == AF_INET6) { + key.family = AF_INET6; + bpf_probe_read(&key.saddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_rcv_saddr); + bpf_probe_read(&key.daddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_daddr); + } else { + return 0; + } + + key.sport = ctx->sport; + key.dport = ctx->dport; + struct long_tcp_metrics *m = bpf_map_lookup_elem(&conn_metrics_map, &key); + if (m) { + __sync_fetch_and_add(&m->bytes_recv, bytes); + } + return 0; +} + + +// Track retransmissions and update RTT. +// (Assumes args->srtt_us provides the current smoothed RTT in microseconds) +TRACEPOINT_PROBE(tcp, tcp_retransmit_skb) { + struct connection_key key = {}; + + if (ctx->family == AF_INET) { + key.family = AF_INET; + key.saddr.v4.addr = ctx->saddr; + key.daddr.v4.addr = ctx->daddr; + } else if (ctx->family == AF_INET6) { + key.family = AF_INET6; + bpf_probe_read(&key.saddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_rcv_saddr); + bpf_probe_read(&key.daddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_daddr); + } else { + return 0; + } + + key.sport = ctx->sport; + key.dport = ctx->dport; + struct long_tcp_metrics *m = conn_metrics.lookup(&key); + if (m) { + __sync_fetch_and_add(&m->retransmissions, 1); + m->srtt_us = args->srtt_us; // update latest RTT, if available + } + return 0; +} + +// Track packet loss events. +TRACEPOINT_PROBE(tcp, tcp_drop) { + struct connection_key key = {}; + + if (ctx->family == AF_INET) { + key.family = AF_INET; + key.saddr.v4.addr = ctx->saddr; + key.daddr.v4.addr = ctx->daddr; + } else if (ctx->family == AF_INET6) { + key.family = AF_INET6; + bpf_probe_read(&key.saddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_rcv_saddr); + bpf_probe_read(&key.daddr.v6.addr, sizeof(struct ipv6_addr), &ctx->skc_v6_daddr); + } else { + return 0; + } + + key.sport = ctx->sport; + key.dport = ctx->dport; + struct long_tcp_metrics *m = conn_metrics.lookup(&key); + if (m) { + __sync_fetch_and_add(&m->packet_loss, 1); + } + return 0; +} + + + +// Flush Function: Periodically invoked via a perf event. +// Iterates over the conn_metrics_map and submits events for connections +// that have been open longer than LONG_CONN_THRESHOLD_NS. +SEC("perf_event/flush") +int flush_connections(struct bpf_perf_event_data *ctx) +{ + struct connection_key key = {}; + struct connection_key next_key = {}; + struct long_tcp_metrics *m; + u64 now = bpf_ktime_get_ns(); + int ret; + + // Iterate over the map. The loop is bounded by a fixed maximum (e.g., 1024 iterations) + #pragma unroll + for (int i = 0; i < 1024; i++) { + ret = bpf_map_get_next_key(&conn_metrics_map, &key, &next_key); + if (ret < 0) + break; + + m = bpf_map_lookup_elem(&conn_metrics_map, &key); + if (m && (now - m->prev_monitor_ns >= LONG_CONN_THRESHOLD_NS)) { + struct tcp_long_conn_events *e; + e = bpf_ringbuf_reserve(&events, sizeof(*e), 0); + if (e) { + e->key = key; + e->metrics = *m; + bpf_ringbuf_submit(e, 0); + } + m->prev_monitor_ns=now; + } + key = next_key; + } + return 0; +} + +``` + + +Storing the bool enableTcpLongMetric in metricController, the value of the variable is set by user, and the variable is used to decide whether TCP long connection metrics should be collected and reported. + +``` +## Note: "---" means, previous code remains same +type MetricController struct { + --- + EnableTCPLongMetric atomic.Bool + --- +} + +func NewMetric(workloadCache cache.WorkloadCache, serviceCache cache.ServiceCache, enableMonitoring bool, enableTcpLongMetric bool) *MetricController { + m := &MetricController{ + --- + EnableTCPLongMetric: enableTcpLongMetric + } + + --- + + return m +} + +``` + +The labels of tcp_long_connection metric will be same as the labels we currently we have for another metrics. + +Accessing ringbuffer map from the userspace, +``` +func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map, mapOfLongTcpConnInfo *ebpf.Map) { + --- + long_conn_reader, err := ringbuf.NewReader(mapOfTcpInfo) + if err != nil { + log.Errorf("open metric notify ringbuf map FAILED, err: %v", err) + return + } + for { + select { + case <-ctx.Done(): + return + default: + if !m.EnableTCPLongMetric.Load() { + continue + } + data := requestMetric{} + rec := ringbuf.Record{} + if err := long_conn_reader.ReadInto(&rec); err != nil { + log.Errorf("ringbuf reader FAILED to read, err: %v", err) + continue + } + if len(rec.RawSample) != int(unsafe.Sizeof(connectionDataV4{})) { + log.Errorf("wrong length %v of a msg, should be %v", len(rec.RawSample), int(unsafe.Sizeof(connectionDataV4{}))) + continue + } + connectType := binary.LittleEndian.Uint32(rec.RawSample) + originInfo := rec.RawSample[unsafe.Sizeof(connectType):] + buf := bytes.NewBuffer(originInfo) + switch connectType { + case constants.MSG_TYPE_IPV4: + data, err = buildV4Metric(buf) + case constants.MSG_TYPE_IPV6: + data, err = buildV6Metric(buf) + default: + log.Errorf("get connection info failed: %v", err) + continue + } + + workloadLabels := workloadMetricLabels{} + serviceLabels, accesslog := m.buildServiceMetric(&data) + if m.EnableWorkloadMetric.Load() { + workloadLabels = m.buildWorkloadMetric(&data) + } + + m.mutex.Lock() + if m.EnableWorkloadMetric.Load() { + m.updateWorkloadMetricCache(data, workloadLabels) + } + m.updateServiceMetricCache(data, serviceLabels) + m.mutex.Unlock() + } + } +} + +``` + +#### User Stories (Optional) + + + +##### Story 1 + +##### Story 2 + +#### Notes/Constraints/Caveats (Optional) + + + +#### Risks and Mitigations + + + + +#### Test Plan + + + +### Alternatives + + + + \ No newline at end of file