Skip to content

Commit

Permalink
Merge pull request #3 from Werkspot/watcher_and_listener_uses_unique_…
Browse files Browse the repository at this point in the history
…requests

List/Watch uses unique requests
  • Loading branch information
ferrastas authored Jan 29, 2020
2 parents 7e10f9e + 064a7fc commit 7b41bc4
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
12 changes: 7 additions & 5 deletions pkg/eventlistener/event-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Resource struct {
type CallbackFn func(Event, interface{}) error

// RestClientFn will be used to get a REST request
type RestClientFn func(clientset *kubernetes.Clientset) *rest.Request
type RestClientFn func(clientset *kubernetes.Clientset) rest.Interface

// NewEventListener returns a pointer to EventListener
func NewEventListener(ctx context.Context, kubeConfig, kubeContext string, errHandler func(error), logLevel string) *EventListener {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (e *EventListener) Init() (err error) {

// Listen for incoming events from a kubernetes instance
func (e *EventListener) Listen(resource *Resource) (err error) {
listWatcher := e.newFilteredListWatchFromClient(resource.RestClient(e.clientSet), fields.Everything())
listWatcher := e.newFilteredListWatchFromClient(resource.RestClient(e.clientSet), resource.ResourceName, fields.Everything())

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

Expand Down Expand Up @@ -161,22 +161,24 @@ func (e *EventListener) getKubeConfig() (config *rest.Config, err error) {
).ClientConfig()
}

func (e *EventListener) newFilteredListWatchFromClient(r *rest.Request, fieldSelector fields.Selector) *cache.ListWatch {
func (e *EventListener) newFilteredListWatchFromClient(c cache.Getter, resource string, fieldSelector fields.Selector) *cache.ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return r.
return c.Get().
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return r.
return c.Get().
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/resource/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func getIngress() resourceType {
fn: func(callback string) (r *eventlistener.Resource, e error) {
r = &eventlistener.Resource{}
r.ResourceName = "ingresses"
r.RestClient = func(clientset *kubernetes.Clientset) *rest.Request {
return clientset.NetworkingV1beta1().RESTClient().Get().Resource(r.ResourceName)
r.RestClient = func(clientset *kubernetes.Clientset) rest.Interface {
return clientset.NetworkingV1beta1().RESTClient()
}
r.ResourceType = &v1beta1.Ingress{}
r.Callback = createCallbackFn(
Expand Down
4 changes: 2 additions & 2 deletions pkg/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func getPod() resourceType {
fn: func(callback string) (r *eventlistener.Resource, e error) {
r = &eventlistener.Resource{}
r.ResourceName = "pods"
r.RestClient = func(clientset *kubernetes.Clientset) *rest.Request {
return clientset.CoreV1().RESTClient().Get().Resource(r.ResourceName)
r.RestClient = func(clientset *kubernetes.Clientset) rest.Interface {
return clientset.CoreV1().RESTClient()
}
r.ResourceType = &v1.Pod{}
r.Callback = createCallbackFn(
Expand Down
4 changes: 2 additions & 2 deletions pkg/resource/serviceaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func getServiceAccount() resourceType {
fn: func(callback string) (r *eventlistener.Resource, e error) {
r = &eventlistener.Resource{}
r.ResourceName = "serviceaccounts"
r.RestClient = func(clientset *kubernetes.Clientset) *rest.Request {
return clientset.CoreV1().RESTClient().Get().Resource(r.ResourceName)
r.RestClient = func(clientset *kubernetes.Clientset) rest.Interface {
return clientset.CoreV1().RESTClient()
}
r.ResourceType = &v1.ServiceAccount{}
r.Callback = createCallbackFn(
Expand Down

0 comments on commit 7b41bc4

Please sign in to comment.