Skip to content

Commit

Permalink
Merge pull request #8237 from Lyndon-Li/issue-fix-8232
Browse files Browse the repository at this point in the history
Issue 8232: ensure the ending event sinked before shutdown
  • Loading branch information
blackpiglet authored Sep 24, 2024
2 parents 60e9277 + 9deaa81 commit 025d66d
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 39 deletions.
4 changes: 3 additions & 1 deletion pkg/datamover/backup_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient
}

func (r *BackupMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName, r.logger)

handler, err := r.duInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -222,6 +222,8 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
log.WithError(err).Error("Async fs backup was not completed")
}

r.eventRecorder.EndingEvent(du, false, datapath.EventReasonStopped, "Data path for %s stopped", du.Name)

return result, err
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/datamover/backup_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, mes
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}

func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()

bt.withEvent = true
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}
func (bt *backupMsTestHelper) Shutdown() {}

func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
Expand Down Expand Up @@ -336,7 +345,7 @@ func TestRunCancelableDataPath(t *testing.T) {
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{duInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
expectedErr: "timed out waiting for fs backup to complete",
},
{
Expand All @@ -347,7 +356,7 @@ func TestRunCancelableDataPath(t *testing.T) {
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
expectedErr: "fake-data-path-error",
},
{
Expand All @@ -358,7 +367,7 @@ func TestRunCancelableDataPath(t *testing.T) {
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
},
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/datamover/restore_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClien
}

func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName, r.logger)

handler, err := r.ddInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -199,6 +199,8 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
log.WithError(err).Error("Async fs restore was not completed")
}

r.eventRecorder.EndingEvent(dd, false, datapath.EventReasonStopped, "Data path for %s stopped", dd.Name)

return result, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/datamover/restore_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestRunCancelableRestore(t *testing.T) {
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{ddInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
expectedErr: "timed out waiting for fs restore to complete",
},
{
Expand All @@ -300,7 +300,7 @@ func TestRunCancelableRestore(t *testing.T) {
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
expectedErr: "fake-data-path-error",
},
{
Expand All @@ -311,7 +311,7 @@ func TestRunCancelableRestore(t *testing.T) {
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
},
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
EventReasonCancelling = "Data-Path-Canceling"
EventReasonStopped = "Data-Path-Stopped"
)

type microServiceBRWatcher struct {
Expand Down Expand Up @@ -340,15 +341,15 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) {
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
case EventReasonCompleted:
ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.terminatedFromEvent = true
case EventReasonCancelled:
ms.log.Infof("Received data path canceled message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonFailed:
ms.log.Infof("Received data path failed message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonCancelling:
ms.log.Infof("Received data path canceling message: %s", evt.Message)
case EventReasonStopped:
ms.terminatedFromEvent = true
ms.log.Infof("Received data path stop message: %s", evt.Message)
default:
ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message)
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/datapath/micro_service_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func TestStartWatch(t *testing.T) {
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
Expand All @@ -214,6 +217,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
expectStartEvent: true,
expectTerminateEvent: true,
Expand All @@ -231,6 +237,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
redirectLogErr: errors.New("fake-error"),
expectStartEvent: true,
Expand Down Expand Up @@ -269,7 +278,10 @@ func TestStartWatch(t *testing.T) {
insertEventsAfter: []insertEvent{
{
event: &v1.Event{Reason: EventReasonCompleted},
after: time.Second,
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
expectStartEvent: true,
Expand All @@ -293,6 +305,9 @@ func TestStartWatch(t *testing.T) {
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
Expand All @@ -313,6 +328,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCancelled},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
terminationMessage: "fake-termination-message-1",
expectStartEvent: true,
Expand All @@ -339,6 +357,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCancelled},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
terminationMessage: ErrCancelled,
expectStartEvent: true,
Expand Down
130 changes: 123 additions & 7 deletions pkg/util/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ limitations under the License.
package kube

import (
"sync"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,16 +30,34 @@ import (

type EventRecorder interface {
Event(object runtime.Object, warning bool, reason string, message string, a ...any)
EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any)
Shutdown()
}

type eventRecorder struct {
broadcaster record.EventBroadcaster
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
lock sync.Mutex
endingSentinel *eventElement
log logrus.FieldLogger
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder {
res := eventRecorder{}
type eventElement struct {
t string
r string
m string
sinked chan struct{}
}

type eventSink struct {
recorder *eventRecorder
sink typedcorev1.EventInterface
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string, log logrus.FieldLogger) EventRecorder {
res := eventRecorder{
log: log,
}

res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
MaxEvents: 1,
Expand All @@ -45,7 +66,11 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
},
})

res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
res.broadcaster.StartRecordingToSink(&eventSink{
recorder: &res,
sink: kubeClient.CoreV1().Events(""),
})

res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{
Component: eventSource,
Host: eventNode,
Expand All @@ -55,6 +80,10 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
}

func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return
}

eventType := v1.EventTypeNormal
if warning {
eventType = v1.EventTypeWarning
Expand All @@ -67,8 +96,95 @@ func (er *eventRecorder) Event(object runtime.Object, warning bool, reason strin
}
}

func (er *eventRecorder) EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return
}

er.Event(object, warning, reason, message, a...)

var sentinelEvent string

er.lock.Lock()
if er.endingSentinel == nil {
sentinelEvent = uuid.NewString()
er.endingSentinel = &eventElement{
t: v1.EventTypeNormal,
r: sentinelEvent,
m: sentinelEvent,
sinked: make(chan struct{}),
}
}
er.lock.Unlock()

if sentinelEvent != "" {
er.Event(object, false, sentinelEvent, sentinelEvent)
} else {
er.log.Warn("More than one ending events, ignore")
}
}

var shutdownTimeout time.Duration = time.Minute

func (er *eventRecorder) Shutdown() {
// StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time
time.Sleep(2 * time.Second)
var wait chan struct{}
er.lock.Lock()
if er.endingSentinel != nil {
wait = er.endingSentinel.sinked
}
er.lock.Unlock()

if wait != nil {
er.log.Info("Waiting sentinel before shutdown")

waitloop:
for {
select {
case <-wait:
break waitloop
case <-time.After(shutdownTimeout):
er.log.Warn("Timeout waiting for assured events processed")
break waitloop
}
}
}

er.broadcaster.Shutdown()
er.broadcaster = nil

er.lock.Lock()
er.endingSentinel = nil
er.lock.Unlock()
}

func (er *eventRecorder) sentinelWatch(event *v1.Event) bool {
er.lock.Lock()
defer er.lock.Unlock()

if er.endingSentinel == nil {
return false
}

if er.endingSentinel.m == event.Message && er.endingSentinel.r == event.Reason && er.endingSentinel.t == event.Type {
close(er.endingSentinel.sinked)
return true
}

return false
}

func (es *eventSink) Create(event *v1.Event) (*v1.Event, error) {
if es.recorder.sentinelWatch(event) {
return event, nil
}

return es.sink.CreateWithEventNamespace(event)
}

func (es *eventSink) Update(event *v1.Event) (*v1.Event, error) {
return es.sink.UpdateWithEventNamespace(event)
}

func (es *eventSink) Patch(event *v1.Event, data []byte) (*v1.Event, error) {
return es.sink.PatchWithEventNamespace(event, data)
}
Loading

0 comments on commit 025d66d

Please sign in to comment.