Skip to content

Commit

Permalink
issue 8232: ensure the ending event sinked before shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Sep 23, 2024
1 parent 3f9c2dc commit 4556076
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 32 deletions.
4 changes: 3 additions & 1 deletion pkg/datamover/backup_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient
}

func (r *BackupMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName, r.logger)

handler, err := r.duInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -222,6 +222,8 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string,
log.WithError(err).Error("Async fs backup was not completed")
}

r.eventRecorder.EndingEvent(du, false, datapath.EventReasonStopped, "Data path for %s stopped", du.Name)

return result, err
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/datamover/backup_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, mes
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}

func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()

bt.withEvent = true
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}
func (bt *backupMsTestHelper) Shutdown() {}

func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/datamover/restore_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClien
}

func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName, r.logger)

handler, err := r.ddInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -199,6 +199,8 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string
log.WithError(err).Error("Async fs restore was not completed")
}

r.eventRecorder.EndingEvent(dd, false, datapath.EventReasonStopped, "Data path for %s stopped", dd.Name)

return result, err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
EventReasonCancelling = "Data-Path-Canceling"
EventReasonStopped = "Data-Path-Stopped"
)

type microServiceBRWatcher struct {
Expand Down Expand Up @@ -340,15 +341,15 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) {
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
case EventReasonCompleted:
ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.terminatedFromEvent = true
case EventReasonCancelled:
ms.log.Infof("Received data path canceled message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonFailed:
ms.log.Infof("Received data path failed message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonCancelling:
ms.log.Infof("Received data path canceling message: %s", evt.Message)
case EventReasonStopped:
ms.terminatedFromEvent = true
ms.log.Infof("Received data path stop message: %s", evt.Message)
default:
ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message)
}
Expand Down
130 changes: 123 additions & 7 deletions pkg/util/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ limitations under the License.
package kube

import (
"sync"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,16 +30,34 @@ import (

type EventRecorder interface {
Event(object runtime.Object, warning bool, reason string, message string, a ...any)
EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any)
Shutdown()
}

type eventRecorder struct {
broadcaster record.EventBroadcaster
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
lock sync.Mutex
endingSentinal *eventElement
log logrus.FieldLogger
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder {
res := eventRecorder{}
type eventElement struct {
t string
r string
m string
sinked chan struct{}
}

type eventSink struct {
recorder *eventRecorder
sink typedcorev1.EventInterface
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string, log logrus.FieldLogger) EventRecorder {
res := eventRecorder{
log: log,
}

res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
MaxEvents: 1,
Expand All @@ -45,7 +66,11 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
},
})

res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
res.broadcaster.StartRecordingToSink(&eventSink{
recorder: &res,
sink: kubeClient.CoreV1().Events(""),
})

res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{
Component: eventSource,
Host: eventNode,
Expand All @@ -55,6 +80,10 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
}

func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return
}

eventType := v1.EventTypeNormal
if warning {
eventType = v1.EventTypeWarning
Expand All @@ -67,8 +96,95 @@ func (er *eventRecorder) Event(object runtime.Object, warning bool, reason strin
}
}

func (er *eventRecorder) EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return
}

er.Event(object, warning, reason, message, a...)

var sentinalEvent string

er.lock.Lock()
if er.endingSentinal == nil {
sentinalEvent = uuid.NewString()
er.endingSentinal = &eventElement{
t: v1.EventTypeNormal,
r: sentinalEvent,
m: sentinalEvent,
sinked: make(chan struct{}),
}
}
er.lock.Unlock()

if sentinalEvent != "" {
er.Event(object, false, sentinalEvent, sentinalEvent)
} else {
er.log.Warn("More than one ending events, ignore")
}
}

var shutdownTimeout time.Duration = time.Minute

func (er *eventRecorder) Shutdown() {
// StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time
time.Sleep(2 * time.Second)
var wait chan struct{}
er.lock.Lock()
if er.endingSentinal != nil {
wait = er.endingSentinal.sinked
}
er.lock.Unlock()

if wait != nil {
er.log.Info("Waiting sentinal before shutdown")

Check failure on line 138 in pkg/util/kube/event.go

View workflow job for this annotation

GitHub Actions / Run Codespell

sentinal ==> sentinel

waitloop:
for {
select {
case <-wait:
break waitloop
case <-time.After(shutdownTimeout):
er.log.Warn("Timeout waiting for assured events processed")
break waitloop
}
}
}

er.broadcaster.Shutdown()
er.broadcaster = nil

er.lock.Lock()
er.endingSentinal = nil
er.lock.Unlock()
}

func (er *eventRecorder) sentinalWatch(event *v1.Event) bool {
er.lock.Lock()
defer er.lock.Unlock()

if er.endingSentinal == nil {
return false
}

if er.endingSentinal.m == event.Message && er.endingSentinal.r == event.Reason && er.endingSentinal.t == event.Type {
close(er.endingSentinal.sinked)
return true
}

return false
}

func (es *eventSink) Create(event *v1.Event) (*v1.Event, error) {
if es.recorder.sentinalWatch(event) {
return event, nil
}

return es.sink.CreateWithEventNamespace(event)
}

func (es *eventSink) Update(event *v1.Event) (*v1.Event, error) {
return es.sink.UpdateWithEventNamespace(event)
}

func (es *eventSink) Patch(event *v1.Event, data []byte) (*v1.Event, error) {
return es.sink.PatchWithEventNamespace(event, data)
}
Loading

0 comments on commit 4556076

Please sign in to comment.