-
Notifications
You must be signed in to change notification settings - Fork 561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Network performance measurement: Implementing worker using CRD #1645
Conversation
Welcome @VivekThrivikraman-est! |
Hi @VivekThrivikraman-est. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
for _, line := range result { | ||
klog.V(3).Info(line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V(4) maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
if !isPresent { | ||
return nil, fmt.Errorf("property %v not present in request", parameter) | ||
} | ||
if reflect.TypeOf(value).Kind() == reflect.Int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The properties contains mix of string and int, but all of them are required to be converted to string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - I see now. I'm not huge fan of this pattern.
Can we change the requestProperties to really be of type map[string]string and does this parsing elsewhere (in a dedicated function that will be extracting properties)?
In other words, there are exactly 3 parameters that are used across all command. I would really prefer to have a function like:
func extractParameters(map[string]interface{} input) map[string]string {
result := map[string]string{}
for _, param := range []string{"duration", "numberOfClients", "serverPodIP"} {
// if appropriate param exists, extract to result.
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
) | ||
|
||
var ( | ||
protocolCommandMap = map[string]string{ProtocolHTTP: "siege", ProtocolTCP: "iperf", ProtocolUDP: "iperf"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please split the lines, i.e.
protocolCommandMap = map[string]string{
ProtocolHTTP: "siege",
ProtocolTCP: "iperf",
ProtocolUDP: "iperf"
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
klog.Infof("Value after resolving template %v: %v", parameter, arguments[i]) | ||
} | ||
return arguments, nil | ||
} | ||
|
||
func getDynamicResource(gvk schema.GroupVersionKind, namespace string) (dynamic.ResourceInterface, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe let's call it getResourceInterface
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is no more there, but modified the variable as mentioned.
protocol string | ||
workerDelay time.Duration | ||
startTime time.Time | ||
dynamicResource dynamic.ResourceInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resourceInterface ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
var isPodNameAvailable bool | ||
w.podName, isPodNameAvailable = os.LookupEnv("POD_NAME") | ||
if !isPodNameAvailable { | ||
w.updateStatus(map[string]interface{}{"error": "Pod name not set as environment variable"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you create a helper function:
makeErrorStatus(err error)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
w.sendResponse(rw, http.StatusInternalServerError, Status{Message: message}) | ||
return | ||
func (w *Worker) watchForWork() { | ||
watcher, err := w.getWatcher() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not going to work universally:
- watch starts at now, so if the object was created earlier you will never watch it
- you don't handle all corner cases, resuming watches on timeouts/errors etc.
Instead of that - you should simply use the informer that will do all of that for you (and just register callbacks).
Here you have an example how to do that for dynamic clients:
func NewDynamicInformer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified to use informer.
if !isPresent { | ||
return nil, fmt.Errorf("property %v not present in request", parameter) | ||
} | ||
if reflect.TypeOf(value).Kind() == reflect.Int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - I see now. I'm not huge fan of this pattern.
Can we change the requestProperties to really be of type map[string]string and does this parsing elsewhere (in a dedicated function that will be extracting properties)?
In other words, there are exactly 3 parameters that are used across all command. I would really prefer to have a function like:
func extractParameters(map[string]interface{} input) map[string]string {
result := map[string]string{}
for _, param := range []string{"duration", "numberOfClients", "serverPodIP"} {
// if appropriate param exists, extract to result.
}
}
w.podName, isPodNameAvailable = os.LookupEnv("POD_NAME") | ||
if !isPodNameAvailable { | ||
err := errors.New("pod name not set as environment variable") | ||
klog.Fatal(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: klog.Fatalf(errors.New(...))
(don't create err var)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
if _, err := rw.Write(marshalledResponse); err != nil { | ||
klog.Errorf("Error writing response to ResponseWriter: %v", err) | ||
klog.Info("Recevied add event for resource with spec:", w.work.resourceSpec) | ||
if w.podName == w.work.resourceSpec.ClientPodName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch w.podName {
case w.work.resourceSpec.ClientPodName:
..
case w.work.resourceSpec.ServerPodName:
..
default:
..
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
/ok-to-test This looks great - it's pretty close to mergable state. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple more small comments, but this is pretty close - thanks!
) | ||
|
||
var gvk = schema.GroupVersionKind{Group: "clusterloader.io", Kind: "NetworkTestRequest", Version: "v1"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't go directly to v1 - let's start with v1alpha1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
httpPort = "5301" | ||
) | ||
// http server listen port. | ||
const httpPort = "5301" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's do:
const (
httpPort...
namespace
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
startTime time.Time | ||
k8sClient dynamic.Interface | ||
resourceInterface dynamic.ResourceInterface | ||
gvr schema.GroupVersionResource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this one (see my comment below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
w.listenToController() | ||
w.initialize() | ||
w.populatePodName() | ||
w.registerForCustomResourceEvents(w.handleCustomResource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change to :
w.populatePodName()
w.initialize()
and merge initialize and registerForCustomResourceEvents to reduce the amount of params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
protocol string | ||
workerDelay time.Duration | ||
startTime time.Time | ||
k8sClient dynamic.Interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once you merge initialize() and registerForCustomResourceEvents as suggested below, this one also will no longer be needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry - I added few more comment, but I think it will make it more reliable and easier to follow.
But we are pretty close with this one.
type work struct { | ||
resourceContent map[string]interface{} | ||
workType string | ||
resourceSpecMap map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually - let's remove this one - and just keep "resourceSpec" field below (it contains the same information).
It means that you need to change extractParameters
method to take resourceSpec
as input, but it's actually fine (and probably even easier to reason about).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
rw.WriteHeader(http.StatusInternalServerError) | ||
err = fmt.Errorf("populating resource spec failed: %v", err) | ||
w.updateStatus(makeErrorStatus(err)) | ||
klog.Error(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually - it seems to be used in couple places, so let's add a helper method:
func (w *Worker) handleError(err error) {
klog.Errorf(err)
if err := w.updateStatus(...) {
klog.Errorf(err)
}
}
and use it everywehre
and call it everywhere (
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
w.work.workType = "server" | ||
default: | ||
klog.Error(errors.New("pod name not set as client or server")) | ||
w.updateStatus(makeErrorStatus(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're reporting the wrong error here - see also comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
w.startWork(rw, request, ProtocolHTTP, "siege", httpClientArguments) | ||
func (w *Worker) populateResourceSpec(object runtime.Object) error { | ||
var err error | ||
w.work.resourceContent, err = runtime.DefaultUnstructuredConverter.ToUnstructured(object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not store resourceContent
in the "work" item - see my comment below.
func (w *Worker) updateStatus(status map[string]interface{}) { | ||
w.work.resourceContent["status"] = status | ||
var unstructuredRes unstructured.Unstructured | ||
unstructuredRes.SetUnstructuredContent(w.work.resourceContent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modifying resourceContent etc., let's simply use the
UpdateStatus method, instead of Update:
https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/dynamic/interface.go#L36
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As informed ,have issues with UpdateStatus,hence for now not changing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I wasn't clear offline. What I said is that, instead of storing "resourceContent" explicit in the Worker just do
(syntax to fix):
obj, err := w.resourceInterface.Get(context.TODO(), name)
// handle err
// transform to map[string]interface{}
// do what you do here
This let's you remove resourceContent from Worker struct too (without doing UpdateStatus).
// TODO(@VivekThrivikraman-est): add retries. | ||
_, err := w.resourceInterface.Update(context.TODO(), &unstructuredRes, metav1.UpdateOptions{}) | ||
if err != nil { | ||
klog.Errorf("Error updating status: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually - let's change this method to return this error and don't log anything here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
LGTM @VivekThrivikraman-est - please squash and I will approve |
f17bb8f
to
cda4b87
Compare
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: VivekThrivikraman-est, wojtek-t The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
/kind feature
What this PR does / why we need it:
In the current implementation of network performance measurement, controller and worker communicates with each other using http, this change is for making the controller and worker communicate using custom resources. This PR contains the changes for worker portion.
Which issue(s) this PR fixes:
Fixes #1631
Special notes for your reviewer:
Does this PR introduce a user-facing change?:
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: