Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 8579 - set event burst #8590

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8590-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8579, set event burst to block event broadcaster from filtering events
4 changes: 4 additions & 0 deletions pkg/util/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package kube

import (
"math"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +61,9 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
}

res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
// Bypass the built-in EventCorrelator's rate filtering, otherwise, the event will be abandoned if the rate exceeds.
// The callers (i.e., data mover pods) have controlled the rate and total number outside. E.g., the progress is designed to be updated every 10 seconds and is changeable.
BurstSize: math.MaxInt32,
MaxEvents: 1,
MessageFunc: func(event *v1.Event) string {
return event.Message
Expand Down
43 changes: 40 additions & 3 deletions pkg/util/kube/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func TestEvent(t *testing.T) {
}

cases := []struct {
name string
events []testEvent
expected int
name string
events []testEvent
generateDiff int
generateSame int
generateEnding bool
expected int
}{
{
name: "update events, different message",
Expand Down Expand Up @@ -116,6 +119,18 @@ func TestEvent(t *testing.T) {
},
expected: -1,
},
{
name: "auto generate 200",
generateDiff: 200,
generateEnding: true,
expected: 201,
},
{
name: "auto generate 200, update",
generateSame: 200,
generateEnding: true,
expected: 2,
},
}

shutdownTimeout = time.Second * 5
Expand Down Expand Up @@ -143,6 +158,28 @@ func TestEvent(t *testing.T) {
_, err = client.CoreV1().Pods("fake-ns").Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

for i := 0; i < tc.generateDiff; i++ {
tc.events = append(tc.events, testEvent{
reason: fmt.Sprintf("fake-reason-%v", i),
message: fmt.Sprintf("fake-message-%v", i),
})
}

for i := 0; i < tc.generateSame; i++ {
tc.events = append(tc.events, testEvent{
reason: "fake-reason",
message: fmt.Sprintf("fake-message-%v", i),
})
}

if tc.generateEnding {
tc.events = append(tc.events, testEvent{
reason: "fake-ending-reason",
message: "fake-ending-message",
ending: true,
})
}

for _, e := range tc.events {
if e.ending {
recorder.EndingEvent(pod, e.warning, e.reason, e.message)
Expand Down
Loading