Skip to content

Commit

Permalink
viz: Extract policy metrics into a distinct file (#8167)
Browse files Browse the repository at this point in the history
The policy metrics logic is intermingled with other metrics code in
`stat_summary.go`.

This change extracts this code into a distinct file so that the code can
be changed more easily.

No functional changes.
  • Loading branch information
olix0r authored Mar 29, 2022
1 parent 00954d7 commit a274393
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 167 deletions.
195 changes: 195 additions & 0 deletions viz/metrics-api/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package api

import (
"context"
"fmt"
"strings"

"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/prometheus/common/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
httpAuthzDenyQuery = "sum(increase(inbound_http_authz_deny_total%s[%s])) by (%s)"
httpAuthzAllowQuery = "sum(increase(inbound_http_authz_allow_total%s[%s])) by (%s)"
)

func isPolicyResource(resource *pb.Resource) bool {
if resource != nil {
if resource.GetType() == k8s.Server || resource.GetType() == k8s.ServerAuthorization {
return true
}
}
return false
}

func (s *grpcServer) policyResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {

policyResources, err := s.getPolicyResourceKeys(req)
if err != nil {
return resourceResult{res: nil, err: err}
}

var requestMetrics map[rKey]*pb.BasicStats
var tcpMetrics map[rKey]*pb.TcpStats
var authzMetrics map[rKey]*pb.ServerStats
if !req.SkipStats {
requestMetrics, tcpMetrics, authzMetrics, err = s.getPolicyMetrics(ctx, req, req.TimeWindow)
if err != nil {
return resourceResult{res: nil, err: err}
}
}

rows := make([]*pb.StatTable_PodGroup_Row, 0)
for _, key := range policyResources {
row := pb.StatTable_PodGroup_Row{
Resource: &pb.Resource{
Name: key.Name,
Namespace: key.Namespace,
Type: req.GetSelector().GetResource().GetType(),
},
TimeWindow: req.TimeWindow,
Stats: requestMetrics[key],
TcpStats: tcpMetrics[key],
SrvStats: authzMetrics[key],
}

rows = append(rows, &row)
}

rsp := pb.StatTable{
Table: &pb.StatTable_PodGroup_{
PodGroup: &pb.StatTable_PodGroup{
Rows: rows,
},
},
}
return resourceResult{res: &rsp, err: nil}
}

func (s *grpcServer) getPolicyResourceKeys(req *pb.StatSummaryRequest) ([]rKey, error) {
var err error
var unstructuredResources *unstructured.UnstructuredList

// TODO(ver): We should use a typed client
var gvr schema.GroupVersionResource
if req.GetSelector().Resource.GetType() == k8s.Server {
gvr = k8s.ServerGVR
} else if req.GetSelector().Resource.GetType() == k8s.ServerAuthorization {
gvr = k8s.SazGVR
}

res := req.GetSelector().GetResource()
labelSelector, err := getLabelSelector(req)
if err != nil {
return nil, err
}

if res.GetNamespace() == "" {
unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace("").
List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
} else if res.GetName() == "" {
unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
} else {
var ts *unstructured.Unstructured
ts, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
Get(context.TODO(), res.GetName(), metav1.GetOptions{})
if err != nil {
return nil, err
}
unstructuredResources = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{*ts}}
}
if err != nil {
return nil, err
}

var resourceKeys []rKey
for _, resource := range unstructuredResources.Items {
// Resource Key's type should be singular and lowercased while the kind isn't
resourceKeys = append(resourceKeys, rKey{
Namespace: resource.GetNamespace(),
// TODO(ver) This isn't a reliable way to make a plural name singular.
Type: strings.ToLower(resource.GetKind()[0:len(resource.GetKind())]),
Name: resource.GetName(),
})
}
return resourceKeys, nil
}

func (s *grpcServer) getPolicyMetrics(
ctx context.Context,
req *pb.StatSummaryRequest,
timeWindow string,
) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, map[rKey]*pb.ServerStats, error) {
labels, groupBy := buildServerRequestLabels(req)
// These metrics are always inbound.
reqLabels := labels.Merge(model.LabelSet{
"direction": model.LabelValue("inbound"),
})

promQueries := make(map[promType]string)
if req.GetSelector().GetResource().GetType() == k8s.Server {
// TCP metrics are only supported with servers
if req.TcpStats {
// peer is always `src` as these are inbound metrics
tcpLabels := reqLabels.Merge(promPeerLabel("src"))
promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabels.String(), groupBy.String())
promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
}
}

promQueries[promRequests] = fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String())
// Use `labels` as direction isn't present with authorization metrics
promQueries[promAllowedRequests] = fmt.Sprintf(httpAuthzAllowQuery, labels, timeWindow, groupBy.String())
promQueries[promDeniedRequests] = fmt.Sprintf(httpAuthzDenyQuery, labels, timeWindow, groupBy.String())
quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, nil, nil, err
}

basicStats, tcpStats, authzStats := processPrometheusMetrics(req, results, groupBy)
return basicStats, tcpStats, authzStats, nil
}

func buildServerRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
if req.GetSelector().GetResource().GetNamespace() != "" {
labels = labels.Merge(model.LabelSet{
namespaceLabel: model.LabelValue(req.GetSelector().GetResource().GetNamespace()),
})
}
var resourceLabel model.LabelName
if req.GetSelector().GetResource().GetType() == k8s.Server {
resourceLabel = serverLabel
} else if req.GetSelector().GetResource().GetType() == k8s.ServerAuthorization {
resourceLabel = serverAuthorizationLabel
}

if req.GetSelector().GetResource().GetName() != "" {
labels = labels.Merge(model.LabelSet{
resourceLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
})
}

switch out := req.Outbound.(type) {
case *pb.StatSummaryRequest_ToResource:
// if --to flag is passed, Calculate traffic sent to the policy resource
// with additional filtering narrowing down to the workload
// it is sent to.
labels = labels.Merge(promQueryLabels(out.ToResource))

// No FromResource case as policy metrics are all inbound
default:
// no extra labels needed
}

groupBy := model.LabelNames{namespaceLabel, resourceLabel}

return labels, groupBy
}
Loading

0 comments on commit a274393

Please sign in to comment.