Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network performance measurement: Implementing worker using CRD #1645

Merged
merged 1 commit into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util-images/network/netperfbenchmark/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ module k8s.io/perf-tests/util-images/network/netperfbenchmark
go 1.15

require (
k8s.io/api v0.18.0
k8s.io/apimachinery v0.18.0
k8s.io/client-go v0.18.0
k8s.io/klog v1.0.0
)
122 changes: 88 additions & 34 deletions util-images/network/netperfbenchmark/pkg/worker/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@ limitations under the License.
package worker

import (
"bufio"
"errors"
"fmt"
"io"
"math"
"net/http"
"os/exec"
"regexp"
"strconv"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

Expand All @@ -42,18 +50,11 @@ const (
httpMetricsCount = 12
)

// Iperf results vary from protocol,required for proper parsing.
const (
zeroFormatTCP = "0.0"
fractionZeroFormatTCP = ".0"
zeroFormatUDP = "0.00"
fractionZeroFormatUDP = ".00"
)

var (
unitRegex = regexp.MustCompile(`%|\[\s+|\]\s+|KBytes\s+|KBytes/sec\s*|sec\s+|pps\s*|ms\s+|/|\(|\)\s+`)
multiSpaceRegex = regexp.MustCompile(`\s+`)
hyphenSpaceRegex = regexp.MustCompile(`\-\s+`)
intervalRegex = regexp.MustCompile(`\d+\.\d+\-\s*\d+\.\d+`)
)

// Function to be applied for each metric for aggregation.
Expand All @@ -66,31 +67,31 @@ var (
func parseResult(protocol string, result []string, testDuration string) ([]float64, error) {
switch protocol {
case ProtocolTCP:
return parseIperfResponse(result, testDuration, tcpMetricsCount, ProtocolTCP, zeroFormatTCP, fractionZeroFormatTCP, iperfTCPFunction)
return parseIperfResponse(result, testDuration, tcpMetricsCount, ProtocolTCP, iperfTCPFunction)
case ProtocolUDP:
return parseIperfResponse(result, testDuration, udpMetricsCount, ProtocolUDP, zeroFormatUDP, fractionZeroFormatUDP, iperfUDPFunction)
return parseIperfResponse(result, testDuration, udpMetricsCount, ProtocolUDP, iperfUDPFunction)
case ProtocolHTTP:
return parseSiegeResponse(result)
default:
return nil, errors.New("invalid protocol: " + protocol)
}
}

func parseIperfResponse(result []string, testDuration string, metricCount int, protocol, zeroFormat, fractionZeroFormat string, operators []string) ([]float64, error) {
func parseIperfResponse(result []string, testDuration string, metricCount int, protocol string, operators []string) ([]float64, error) {
aggregatedResult := make([]float64, 0, metricCount)
count := 0
sessionID := make(map[string]bool)
interval := zeroFormat + "-" + testDuration + fractionZeroFormat
for _, line := range result {
klog.V(4).Info(line)
split := parseIperfLine(line)
// iperf gives aggregated result in a row with "SUM", but many fields are not
// getting aggregated hence not depending on iperf's aggregation.
if len(split) < metricCount+1 || "SUM" == split[1] || split[2] != interval {
if len(split) < metricCount+1 || "SUM" == split[1] || !intervalRegex.MatchString(split[2]) {
continue
}
// iperf sometimes prints duplicate rows for same session id(for same duration), making sure
// the row is taken for calculation only once.
if _, ok := sessionID[split[1]]; ok {
if _, isDuplicate := sessionID[split[1]]; isDuplicate {
continue
}
sessionID[split[1]] = true
Expand Down Expand Up @@ -118,7 +119,6 @@ func parseIperfResponse(result []string, testDuration string, metricCount int, p
count++
}
}
klog.Infof("Final output: %v", aggregatedResult)
return aggregatedResult, nil
}

Expand Down Expand Up @@ -152,7 +152,6 @@ func parseSiegeResponse(result []string) ([]float64, error) {
}
aggregatedResult = append(aggregatedResult, tmp)
}
klog.Infof("Final output: %v", aggregatedResult)
return aggregatedResult, nil
}

Expand All @@ -177,33 +176,88 @@ func trimSiegeResponse(result []string) ([]string, error) {
return result[beginIndex:endIndex], nil
}

// GetValuesFromURL returns a map with values parsed from http request,for attributes specified in paramsToSearch.
func getValuesFromURL(request *http.Request, parametersToSearch []string) (map[string]string, error) {
values := request.URL.Query()
paramMap := make(map[string]string)
for _, param := range parametersToSearch {
val := values.Get(param)
if val == "" {
return nil, fmt.Errorf("missing URL parameter: %s", param)
}
paramMap[param] = val
}
return paramMap, nil
func extractParameters(input resourceProperties) map[string]string {
result := map[string]string{}
result["duration"] = strconv.FormatInt(input.Duration, 10)
result["numberOfClients"] = strconv.FormatInt(input.NumberOfClients, 10)
result["serverPodIP"] = input.ServerPodIP
return result
}

// populateTemplates populates template parameters with actual values from the http request object.
func populateTemplates(arguments []string, request *http.Request) ([]string, error) {
func populateTemplates(arguments []string, requestProperties map[string]string) ([]string, error) {
for i, argument := range arguments {
if idx := strings.Index(argument, "{"); idx == -1 {
continue
}
parameter := argument[strings.Index(argument, "{")+1 : strings.Index(argument, "}")]
valMap, err := getValuesFromURL(request, []string{parameter})
if err != nil {
return nil, err
value, isPresent := requestProperties[parameter]
if !isPresent {
return nil, fmt.Errorf("property %v not present in request", parameter)
}
arguments[i] = strings.Replace(argument, "{"+parameter+"}", valMap[parameter], 1)
arguments[i] = strings.Replace(argument, "{"+parameter+"}", value, 1)
klog.Infof("Value after resolving template %v: %v", parameter, arguments[i])
}
return arguments, nil
}

func getInformer(labelSelector, namespace string, k8sClient dynamic.Interface, gvr schema.GroupVersionResource) (cache.SharedInformer, error) {
optionsModifier := func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
}
tweakListOptions := dynamicinformer.TweakListOptionsFunc(optionsModifier)
dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k8sClient, 0, namespace, tweakListOptions)
informer := dynamicInformerFactory.ForResource(gvr).Informer()
return informer, nil
}

func getDynamicClient() (dynamic.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed fetching cluster config: %s", err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed getting dynamic client: %s", err)
}
return dynamicClient, nil
}

func executeCommand(commandString string, arguments []string) ([]string, error) {
command := exec.Command(commandString, arguments...)
out, err := command.StdoutPipe()
if err != nil {
return nil, err
}
errorOut, err := command.StderrPipe()
if err != nil {
return nil, err
}
multiOut := io.MultiReader(out, errorOut)
if err := command.Start(); err != nil {
return nil, err
}
return scanOutput(multiOut)
}

func scanOutput(out io.Reader) ([]string, error) {
var result []string
scanner := bufio.NewScanner(out)
for scanner.Scan() {
if line := scanner.Text(); len(line) > 0 {
result = append(result, line)
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return result, nil
}

func makeErrorStatus(err error) map[string]interface{} {
return map[string]interface{}{"error": err}
}

func makeSuccessStatus(metrics []float64, delay float64) map[string]interface{} {
return map[string]interface{}{"metrics": metrics, "workerDelay": delay}
}
Loading