diff --git a/util-images/network/netperfbenchmark/go.mod b/util-images/network/netperfbenchmark/go.mod index 13863f2ae9..fe68a777bd 100644 --- a/util-images/network/netperfbenchmark/go.mod +++ b/util-images/network/netperfbenchmark/go.mod @@ -3,5 +3,8 @@ module k8s.io/perf-tests/util-images/network/netperfbenchmark go 1.15 require ( + k8s.io/api v0.18.0 + k8s.io/apimachinery v0.18.0 + k8s.io/client-go v0.18.0 k8s.io/klog v1.0.0 ) diff --git a/util-images/network/netperfbenchmark/pkg/worker/util.go b/util-images/network/netperfbenchmark/pkg/worker/util.go index 6b6fd09c95..f971d17dd0 100644 --- a/util-images/network/netperfbenchmark/pkg/worker/util.go +++ b/util-images/network/netperfbenchmark/pkg/worker/util.go @@ -17,14 +17,22 @@ limitations under the License. package worker import ( + "bufio" "errors" "fmt" + "io" "math" - "net/http" + "os/exec" "regexp" "strconv" "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog" ) @@ -42,18 +50,11 @@ const ( httpMetricsCount = 12 ) -// Iperf results vary from protocol,required for proper parsing. -const ( - zeroFormatTCP = "0.0" - fractionZeroFormatTCP = ".0" - zeroFormatUDP = "0.00" - fractionZeroFormatUDP = ".00" -) - var ( unitRegex = regexp.MustCompile(`%|\[\s+|\]\s+|KBytes\s+|KBytes/sec\s*|sec\s+|pps\s*|ms\s+|/|\(|\)\s+`) multiSpaceRegex = regexp.MustCompile(`\s+`) hyphenSpaceRegex = regexp.MustCompile(`\-\s+`) + intervalRegex = regexp.MustCompile(`\d+\.\d+\-\s*\d+\.\d+`) ) // Function to be applied for each metric for aggregation. @@ -66,9 +67,9 @@ var ( func parseResult(protocol string, result []string, testDuration string) ([]float64, error) { switch protocol { case ProtocolTCP: - return parseIperfResponse(result, testDuration, tcpMetricsCount, ProtocolTCP, zeroFormatTCP, fractionZeroFormatTCP, iperfTCPFunction) + return parseIperfResponse(result, testDuration, tcpMetricsCount, ProtocolTCP, iperfTCPFunction) case ProtocolUDP: - return parseIperfResponse(result, testDuration, udpMetricsCount, ProtocolUDP, zeroFormatUDP, fractionZeroFormatUDP, iperfUDPFunction) + return parseIperfResponse(result, testDuration, udpMetricsCount, ProtocolUDP, iperfUDPFunction) case ProtocolHTTP: return parseSiegeResponse(result) default: @@ -76,21 +77,21 @@ func parseResult(protocol string, result []string, testDuration string) ([]float } } -func parseIperfResponse(result []string, testDuration string, metricCount int, protocol, zeroFormat, fractionZeroFormat string, operators []string) ([]float64, error) { +func parseIperfResponse(result []string, testDuration string, metricCount int, protocol string, operators []string) ([]float64, error) { aggregatedResult := make([]float64, 0, metricCount) count := 0 sessionID := make(map[string]bool) - interval := zeroFormat + "-" + testDuration + fractionZeroFormat for _, line := range result { + klog.V(4).Info(line) split := parseIperfLine(line) // iperf gives aggregated result in a row with "SUM", but many fields are not // getting aggregated hence not depending on iperf's aggregation. - if len(split) < metricCount+1 || "SUM" == split[1] || split[2] != interval { + if len(split) < metricCount+1 || "SUM" == split[1] || !intervalRegex.MatchString(split[2]) { continue } // iperf sometimes prints duplicate rows for same session id(for same duration), making sure // the row is taken for calculation only once. - if _, ok := sessionID[split[1]]; ok { + if _, isDuplicate := sessionID[split[1]]; isDuplicate { continue } sessionID[split[1]] = true @@ -118,7 +119,6 @@ func parseIperfResponse(result []string, testDuration string, metricCount int, p count++ } } - klog.Infof("Final output: %v", aggregatedResult) return aggregatedResult, nil } @@ -152,7 +152,6 @@ func parseSiegeResponse(result []string) ([]float64, error) { } aggregatedResult = append(aggregatedResult, tmp) } - klog.Infof("Final output: %v", aggregatedResult) return aggregatedResult, nil } @@ -177,33 +176,88 @@ func trimSiegeResponse(result []string) ([]string, error) { return result[beginIndex:endIndex], nil } -// GetValuesFromURL returns a map with values parsed from http request,for attributes specified in paramsToSearch. -func getValuesFromURL(request *http.Request, parametersToSearch []string) (map[string]string, error) { - values := request.URL.Query() - paramMap := make(map[string]string) - for _, param := range parametersToSearch { - val := values.Get(param) - if val == "" { - return nil, fmt.Errorf("missing URL parameter: %s", param) - } - paramMap[param] = val - } - return paramMap, nil +func extractParameters(input resourceProperties) map[string]string { + result := map[string]string{} + result["duration"] = strconv.FormatInt(input.Duration, 10) + result["numberOfClients"] = strconv.FormatInt(input.NumberOfClients, 10) + result["serverPodIP"] = input.ServerPodIP + return result } // populateTemplates populates template parameters with actual values from the http request object. -func populateTemplates(arguments []string, request *http.Request) ([]string, error) { +func populateTemplates(arguments []string, requestProperties map[string]string) ([]string, error) { for i, argument := range arguments { if idx := strings.Index(argument, "{"); idx == -1 { continue } parameter := argument[strings.Index(argument, "{")+1 : strings.Index(argument, "}")] - valMap, err := getValuesFromURL(request, []string{parameter}) - if err != nil { - return nil, err + value, isPresent := requestProperties[parameter] + if !isPresent { + return nil, fmt.Errorf("property %v not present in request", parameter) } - arguments[i] = strings.Replace(argument, "{"+parameter+"}", valMap[parameter], 1) + arguments[i] = strings.Replace(argument, "{"+parameter+"}", value, 1) klog.Infof("Value after resolving template %v: %v", parameter, arguments[i]) } return arguments, nil } + +func getInformer(labelSelector, namespace string, k8sClient dynamic.Interface, gvr schema.GroupVersionResource) (cache.SharedInformer, error) { + optionsModifier := func(options *metav1.ListOptions) { + options.LabelSelector = labelSelector + } + tweakListOptions := dynamicinformer.TweakListOptionsFunc(optionsModifier) + dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k8sClient, 0, namespace, tweakListOptions) + informer := dynamicInformerFactory.ForResource(gvr).Informer() + return informer, nil +} + +func getDynamicClient() (dynamic.Interface, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed fetching cluster config: %s", err) + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed getting dynamic client: %s", err) + } + return dynamicClient, nil +} + +func executeCommand(commandString string, arguments []string) ([]string, error) { + command := exec.Command(commandString, arguments...) + out, err := command.StdoutPipe() + if err != nil { + return nil, err + } + errorOut, err := command.StderrPipe() + if err != nil { + return nil, err + } + multiOut := io.MultiReader(out, errorOut) + if err := command.Start(); err != nil { + return nil, err + } + return scanOutput(multiOut) +} + +func scanOutput(out io.Reader) ([]string, error) { + var result []string + scanner := bufio.NewScanner(out) + for scanner.Scan() { + if line := scanner.Text(); len(line) > 0 { + result = append(result, line) + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + return result, nil +} + +func makeErrorStatus(err error) map[string]interface{} { + return map[string]interface{}{"error": err} +} + +func makeSuccessStatus(metrics []float64, delay float64) map[string]interface{} { + return map[string]interface{}{"metrics": metrics, "workerDelay": delay} +} diff --git a/util-images/network/netperfbenchmark/pkg/worker/worker.go b/util-images/network/netperfbenchmark/pkg/worker/worker.go index 733bb8c1c7..eaa8474e09 100644 --- a/util-images/network/netperfbenchmark/pkg/worker/worker.go +++ b/util-images/network/netperfbenchmark/pkg/worker/worker.go @@ -19,48 +19,66 @@ limitations under the License. package worker import ( - "bufio" + "context" "encoding/json" + "errors" "fmt" - "io" "net/http" - "os/exec" + "os" "strconv" "time" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" "k8s.io/klog" ) -// Worker hold data required for facilitating measurement. +// Worker holds data required for facilitating measurement. type Worker struct { - stopCh chan (struct{}) - parsedResult []float64 - err error - protocol string - workerDelay time.Duration - startTime time.Time + resourceInterface dynamic.ResourceInterface + podName string + work work + stopCh chan struct{} } -// MetricResponse is the response sent back to controller after collecting measurement. -type MetricResponse struct { - Result []float64 - WorkerDelay time.Duration +type work struct { + resourceName string + workType string + resourceSpec resourceProperties + arguments []string } -// Status is the response sent back to controller when the request is unsuccessful. -type Status struct { - Message string +type resourceProperties struct { + ServerPodIP string + ServerPodName string + Duration int64 + NumberOfClients int64 + Protocol string + ClientPodIP string + ClientPodName string + ClientStartTimestamp int64 } type handlersMap map[string]func(http.ResponseWriter, *http.Request) -// http listen ports. +// http server listen port. const ( - workerListenPort = "5003" - httpPort = "5301" + httpPort = "5301" + namespace = "netperf" ) var ( + protocolCommandMap = map[string]string{ + ProtocolHTTP: "siege", + ProtocolTCP: "iperf", + ProtocolUDP: "iperf", + } + // Command arguments for each protocol.This supports templates("{}"), // the value in template will be replaced by value in http request. // Iperf command args: @@ -74,195 +92,224 @@ var ( // -e : enhanced reports, gives more metrics for udp. // -s : run in server mode. // -P numOfClients: handle number of clients before disconnecting. - udpClientArguments = []string{"-c", "{destinationIP}", "-u", "-f", "K", "-l", "20", "-b", "1M", "-e", "-i", "1", "-t", "{duration}"} - udpServerArguments = []string{"-s", "-f", "K", "-u", "-e", "-i", "{duration}", "-P", "{numOfClients}"} - tcpServerArguments = []string{"-s", "-f", "K", "-i", "{duration}", "-P", "{numOfClients}"} - tcpClientArguments = []string{"-c", "{destinationIP}", "-f", "K", "-l", "20", "-b", "1M", "-i", "1", "-t", "{duration}"} + udpClientArguments = []string{"-c", "{serverPodIP}", "-u", "-f", "K", "-l", "20", "-b", "1M", "-e", "-i", "1", "-t", "{duration}"} + udpServerArguments = []string{"-s", "-f", "K", "-u", "-e", "-i", "{duration}", "-P", "{numberOfClients}"} + tcpServerArguments = []string{"-s", "-f", "K", "-i", "{duration}", "-P", "{numberOfClients}"} + tcpClientArguments = []string{"-c", "{serverPodIP}", "-f", "K", "-l", "20", "-b", "1M", "-i", "1", "-t", "{duration}"} // Siege command args: // -d1 : random delay between 0 to 1 sec. // -tS : run test for seconds. // -c1 : one concurrent user. - httpClientArguments = []string{"http://" + "{destinationIP}" + ":" + httpPort + "/test", "-d1", "-t" + "{duration}" + "S", "-c1"} + httpClientArguments = []string{"http://" + "{serverPodIP}" + ":" + httpPort + "/test", "-d1", "-t" + "{duration}" + "S", "-c1"} + + protocolArgumentMap = map[string]map[string][]string{ + "client": { + ProtocolHTTP: httpClientArguments, + ProtocolTCP: tcpClientArguments, + ProtocolUDP: udpClientArguments, + }, + "server": { + ProtocolTCP: tcpServerArguments, + ProtocolUDP: udpServerArguments, + }, + } ) +var gvk = schema.GroupVersionKind{Group: "clusterloader.io", Kind: "NetworkTestRequest", Version: "v1alpha1"} + func NewWorker() *Worker { return &Worker{} } // Start starts the worker. func (w *Worker) Start() { - w.stopCh = make(chan struct{}) - // TODO(#1631): make controller and worker communicate using k8s API (with CustomResources) - // instead of http. - w.listenToController() + w.populatePodName() + w.initialize() } -func (w *Worker) listenToController() { - handlers := handlersMap{ - "/startTCPServer": w.StartTCPServer, - "/startTCPClient": w.StartTCPClient, - "/startUDPServer": w.StartUDPServer, - "/startUDPClient": w.StartUDPClient, - "/startHTTPServer": w.StartHTTPServer, - "/startHTTPClient": w.StartHTTPClient, - "/metrics": w.Metrics, +func (w *Worker) initialize() { + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + k8sClient, err := getDynamicClient() + if err != nil { + klog.Fatalf("Error getting dynamic client:%s", err) + } + w.resourceInterface = k8sClient.Resource(gvr).Namespace(namespace) + informer, err := getInformer(w.getCustomResourceLabelSelector(), namespace, k8sClient, gvr) + if err != nil { + klog.Fatalf("Error getting informer:%s", err) } - w.startListening(workerListenPort, handlers) + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + w.handleCustomResource(obj) + }, + }) + w.stopCh = make(chan struct{}) + informer.Run(w.stopCh) } -func (w *Worker) startListening(port string, handlers handlersMap) { - for urlPath, handler := range handlers { - http.HandleFunc(urlPath, handler) - } - if err := http.ListenAndServe(":"+port, nil); err != nil { - klog.Fatalf("Failed to start http server on port %v: %v", port, err) +func (w *Worker) populatePodName() { + var isPodNameAvailable bool + w.podName, isPodNameAvailable = os.LookupEnv("POD_NAME") + if !isPodNameAvailable { + klog.Fatal(errors.New("pod name not set as environment variable")) } + klog.Info("Pod Name set:", w.podName) } -// Metrics returns the metrics collected. -func (w *Worker) Metrics(rw http.ResponseWriter, request *http.Request) { - select { - case <-w.stopCh: - if w.err != nil { - message := fmt.Sprintf("metrics collection failed: %v", w.err) - w.sendResponse(rw, http.StatusInternalServerError, Status{Message: message}) - return - } - reply := MetricResponse{ - Result: w.parsedResult, - WorkerDelay: w.workerDelay, - } - w.sendResponse(rw, http.StatusOK, reply) - default: - w.sendResponse(rw, http.StatusInternalServerError, Status{Message: "metric collection in progress"}) - } +func (w *Worker) getCustomResourceLabelSelector() string { + return fmt.Sprintf("%s in (clientPodName,serverPodName)", w.podName) } -func (w *Worker) sendResponse(rw http.ResponseWriter, statusCode int, response interface{}) { - rw.Header().Set("Content-Type", "application/json") - marshalledResponse, err := json.Marshal(response) +func (w *Worker) handleCustomResource(obj interface{}) { + newRuntimeObj, ok := obj.(runtime.Object) + if obj != nil && !ok { + klog.Errorf("Error casting object: %s", obj) + return + } + err := w.populateResourceSpec(newRuntimeObj) if err != nil { - klog.Errorf("Error marshalling to json: %v", err) - rw.WriteHeader(http.StatusInternalServerError) + w.handleError(fmt.Errorf("populating resource spec failed: %v", err)) return } - klog.V(3).Infof("Marshalled Response: %v", response) - rw.WriteHeader(statusCode) - if _, err := rw.Write(marshalledResponse); err != nil { - klog.Errorf("Error writing response to ResponseWriter: %v", err) + klog.Info("Recevied add event for resource with spec:", w.work.resourceSpec) + switch w.podName { + case w.work.resourceSpec.ClientPodName: + w.work.workType = "client" + case w.work.resourceSpec.ServerPodName: + w.work.workType = "server" + default: + w.handleError(errors.New("pod name not set as client or server")) + return } + w.startWork() } -// StartHTTPServer starts an http server for http measurements. -func (w *Worker) StartHTTPServer(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting HTTP Server") - go w.startListening(httpPort, handlersMap{"/test": w.Handler}) - w.sendResponse(rw, http.StatusOK, nil) -} - -// StartTCPServer starts iperf server for tcp measurements. -func (w *Worker) StartTCPServer(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting TCP Server") - w.startWork(rw, request, ProtocolTCP, "iperf", tcpServerArguments) -} - -// StartUDPServer starts iperf server for udp measurements. -func (w *Worker) StartUDPServer(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting UDP Server") - w.startWork(rw, request, ProtocolUDP, "iperf", udpServerArguments) +func (w *Worker) populateResourceSpec(object runtime.Object) error { + resourceContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object) + if err != nil { + return fmt.Errorf("error converting event Object to unstructured.Event object: %s", object) + } + metadata := resourceContent["metadata"].(map[string]interface{}) + w.work.resourceName = metadata["name"].(string) + resourceSpecMap := resourceContent["spec"].(map[string]interface{}) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resourceSpecMap, &w.work.resourceSpec); err != nil { + return fmt.Errorf("error converting custom resource properties: %s", err) + } + return nil } -// StartHTTPClient starts an http client for http measurements. -func (w *Worker) StartHTTPClient(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting HTTP Client") - w.startWork(rw, request, ProtocolHTTP, "siege", httpClientArguments) +func (w *Worker) startWork() { + if w.work.resourceSpec.Protocol == ProtocolHTTP && w.work.workType == "server" { + w.StartHTTPServer() + return + } + var err error + properties := extractParameters(w.work.resourceSpec) + w.work.arguments, err = populateTemplates(protocolArgumentMap[w.work.workType][w.work.resourceSpec.Protocol], properties) + if err != nil { + w.handleError(fmt.Errorf("populating template failed: %v", err)) + return + } + klog.Infof("Populated templates: %s", w.work.arguments) + startTimestamp := w.getStartTimestamp() + w.schedule(startTimestamp) } -// StartTCPClient starts iperf client for tcp measurements. -func (w *Worker) StartTCPClient(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting TCP Client") - w.startWork(rw, request, ProtocolTCP, "iperf", tcpClientArguments) +func (w *Worker) getStartTimestamp() int64 { + if w.work.workType == "client" { + return w.work.resourceSpec.ClientStartTimestamp + } + return time.Now().Unix() } -// StartUDPClient starts iperf client for udp measurements. -func (w *Worker) StartUDPClient(rw http.ResponseWriter, request *http.Request) { - klog.Info("Starting UDP Client") - w.startWork(rw, request, ProtocolUDP, "iperf", udpClientArguments) +func (w *Worker) updateStatus(status map[string]interface{}) error { + resource, err := w.resourceInterface.Get(context.TODO(), w.work.resourceName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("updating status failed: %v", err) + } + resourceContent := resource.UnstructuredContent() + resourceContent["status"] = status + var unstructuredRes unstructured.Unstructured + unstructuredRes.SetUnstructuredContent(resourceContent) + // TODO(@VivekThrivikraman-est): add retries. + _, err = w.resourceInterface.Update(context.TODO(), &unstructuredRes, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error updating status: %v", err) + } + klog.Info("Status Updated") + return nil } -// Handler handles http requests for http measurements. -func (w *Worker) Handler(rw http.ResponseWriter, request *http.Request) { - w.sendResponse(rw, http.StatusOK, "ok") +func (w *Worker) handleError(err error) { + klog.Error(err) + if err := w.updateStatus(makeErrorStatus(err)); err != nil { + klog.Error(err) + } } -func (w *Worker) startWork(rw http.ResponseWriter, request *http.Request, protocol, command string, arguments []string) { - arguments, err := populateTemplates(arguments, request) +func (w *Worker) schedule(startTimestamp int64) { + startTime := time.Unix(startTimestamp, 0) + klog.Infof("About to wait until %v, current time: %v", startTime, time.Now()) + time.Sleep(startTime.Sub(time.Now())) + workerDelay := time.Now().Sub(startTime) + command := protocolCommandMap[w.work.resourceSpec.Protocol] + result, err := executeCommand(command, w.work.arguments) if err != nil { - w.sendResponse(rw, http.StatusBadRequest, Status{Message: err.Error()}) + w.handleError(fmt.Errorf("error executing command %v %v: %v", command, w.work.arguments, err)) return } - valMap, err := getValuesFromURL(request, []string{"timestamp", "duration"}) - if err != nil { - w.sendResponse(rw, http.StatusBadRequest, Status{Message: err.Error()}) + if !w.shouldParseResponse() { return } - timestamp, err := strconv.ParseInt(valMap["timestamp"], 10, 64) + durationInt := strconv.FormatInt(w.work.resourceSpec.Duration, 10) + parsedResult, err := parseResult(w.work.resourceSpec.Protocol, result, durationInt) if err != nil { - klog.Errorf("Invalid timestamp %v: %v", valMap["timestamp"], err) + w.handleError(fmt.Errorf("error parsing command response: %v", err)) + return + } + klog.Info("Parsed Response:", parsedResult) + if err := w.updateStatus(makeSuccessStatus(parsedResult, workerDelay.Seconds())); err != nil { + klog.Error(err) } - go w.schedule(timestamp, valMap["duration"], protocol, command, arguments) - w.sendResponse(rw, http.StatusOK, nil) + } -func (w *Worker) schedule(startTimestamp int64, duration string, protocol, command string, arguments []string) { - defer close(w.stopCh) - w.startTime = time.Unix(startTimestamp, 0) - klog.Infof("About to wait until %v, current time: %v", w.startTime, time.Now()) - time.Sleep(w.startTime.Sub(time.Now())) - w.workerDelay = time.Now().Sub(w.startTime) - w.protocol = protocol - result, err := w.executeCommand(command, arguments) - if err != nil { - klog.Errorf("Error executing command: %v", w.err) - w.err = err - return +func (w *Worker) shouldParseResponse() bool { + return (w.work.resourceSpec.Protocol != ProtocolHTTP && w.work.workType == "server") || + (w.work.resourceSpec.Protocol == ProtocolHTTP && w.work.workType == "client") +} + +func (w *Worker) startListening(port string, handlers handlersMap) { + for urlPath, handler := range handlers { + http.HandleFunc(urlPath, handler) } - parsedResult, err := parseResult(w.protocol, result, duration) - if err != nil { - klog.Errorf("Error parsing command response: %v", w.err) - w.err = err - return + if err := http.ListenAndServe(":"+port, nil); err != nil { + klog.Fatalf("Failed to start http server on port %v: %v", port, err) } - w.parsedResult = parsedResult } -func (w *Worker) executeCommand(commandString string, arguments []string) ([]string, error) { - command := exec.Command(commandString, arguments...) - out, err := command.StdoutPipe() - if err != nil { - return nil, err - } - errorOut, err := command.StderrPipe() +func (w *Worker) sendResponse(rw http.ResponseWriter, statusCode int, response interface{}) { + rw.Header().Set("Content-Type", "application/json") + marshalledResponse, err := json.Marshal(response) if err != nil { - return nil, err + klog.Errorf("Error marshalling to json: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return } - multiOut := io.MultiReader(out, errorOut) - if err := command.Start(); err != nil { - return nil, err + klog.V(3).Infof("Marshalled Response: %v", response) + rw.WriteHeader(statusCode) + if _, err := rw.Write(marshalledResponse); err != nil { + klog.Errorf("Error writing response to ResponseWriter: %v", err) } - return w.scanOutput(multiOut) } -func (w *Worker) scanOutput(out io.Reader) ([]string, error) { - var result []string - scanner := bufio.NewScanner(out) - for scanner.Scan() { - if line := scanner.Text(); len(line) > 0 { - result = append(result, line) - } - } - if err := scanner.Err(); err != nil { - return nil, err - } - return result, nil +// StartHTTPServer starts an http server for http measurements. +func (w *Worker) StartHTTPServer() { + klog.Info("Starting HTTP Server") + go w.startListening(httpPort, handlersMap{"/test": w.Handler}) +} + +// Handler handles http requests for http measurements. +func (w *Worker) Handler(rw http.ResponseWriter, request *http.Request) { + w.sendResponse(rw, http.StatusOK, "ok") }