Skip to content

Commit

Permalink
Impl events msg cancellation on unassign
Browse files Browse the repository at this point in the history
  • Loading branch information
xorkevin committed Jul 2, 2023
1 parent aa462e9 commit 6f369e4
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 129 deletions.
299 changes: 172 additions & 127 deletions service/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type (
Subscription interface {
ReadMsg(ctx context.Context) (*Msg, error)
IsAssigned(msg Msg) bool
MsgUnassigned(msg Msg) <-chan struct{}
Commit(ctx context.Context, msg Msg) error
Close(ctx context.Context) error
IsPermanentlyClosed() bool
IsClosed() bool
}

// Events is a service wrapper around an event stream client
Expand Down Expand Up @@ -103,7 +104,7 @@ type (
log *klog.LevelLogger
reader *kgo.Client
mu *sync.RWMutex
assigned map[int32]struct{}
assigned map[int32]chan struct{}
closed bool
}

Expand Down Expand Up @@ -525,7 +526,7 @@ func (s *Service) Subscribe(ctx context.Context, topic, group string, opts Consu
klog.AString("events.group", group),
)),
mu: &sync.RWMutex{},
assigned: map[int32]struct{}{},
assigned: map[int32]chan struct{}{},
closed: false,
}

Expand Down Expand Up @@ -599,6 +600,29 @@ func (s *subscription) IsAssigned(msg Msg) bool {
return ok
}

// MsgUnassigned returns a channel that closes if a message is unassigned from the consumer
func (s *subscription) MsgUnassigned(msg Msg) <-chan struct{} {
if msg.record == nil || msg.record.Topic != s.topic {
ch := make(chan struct{})
close(ch)
return ch
}
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed {
ch := make(chan struct{})
close(ch)
return ch
}
ch, ok := s.assigned[msg.record.Partition]
if !ok {
ch := make(chan struct{})
close(ch)
return ch
}
return ch
}

func (s *subscription) onAssigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -607,7 +631,9 @@ func (s *subscription) onAssigned(ctx context.Context, client *kgo.Client, assig
continue
}
for _, i := range partitions {
s.assigned[i] = struct{}{}
if _, ok := s.assigned[i]; !ok {
s.assigned[i] = make(chan struct{})
}
}
}
}
Expand All @@ -632,7 +658,10 @@ func (s *subscription) rmPartitions(partitions map[string][]int32) {
continue
}
for _, i := range partitions {
delete(s.assigned, i)
if ch, ok := s.assigned[i]; ok {
close(ch)
delete(s.assigned, i)
}
}
}
}
Expand Down Expand Up @@ -688,22 +717,26 @@ func (s *subscription) Close(ctx context.Context) error {
}

s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
// unlock early to prevent any deadlock due to kafka lib implementation
// changes with Close
s.mu.Unlock()

// must commit any marked but uncommitted messages
if err := s.reader.CommitMarkedOffsets(ctx); err != nil {
s.log.Err(ctx, kerrors.WithKind(err, ErrClient, "Failed to commit offsets on revoke"))
}
s.reader.Close()
s.closed = true
s.log.Info(ctx, "Closed subscriber")
return nil
}

// IsPermanentlyClosed returns if the client is closed
func (s *subscription) IsPermanentlyClosed() bool {
// IsClosed returns if the client is closed
func (s *subscription) IsClosed() bool {
return s.isClosed()
}

Expand Down Expand Up @@ -845,6 +878,7 @@ type (

// WatchOpts are options for watching a subscription
WatchOpts struct {
MinBackoff time.Duration
MaxBackoff time.Duration
}

Expand Down Expand Up @@ -891,140 +925,151 @@ func (w *Watcher) lreqID() string {
return w.reqidprefix + "-" + uid.ReqID(w.reqcount.Add(1))
}

const (
watchStartDelay = 1 * time.Second
)

// Watch watches over a subscription
func (w *Watcher) Watch(ctx context.Context, wg ksync.Waiter, opts WatchOpts) {
defer wg.Done()

if opts.MinBackoff == 0 {
opts.MinBackoff = 1 * time.Second
}
if opts.MaxBackoff == 0 {
opts.MaxBackoff = 15 * time.Second
}

delay := watchStartDelay
delay := opts.MinBackoff
for {
select {
case <-ctx.Done():
return
default:
}
func() {
sub, err := w.ev.Subscribe(ctx, w.topic, w.group, w.opts)
if err != nil {
w.log.Err(ctx, kerrors.WithMsg(err, "Error subscribing"))
if err := ktime.After(ctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
sub, err := w.ev.Subscribe(ctx, w.topic, w.group, w.opts)
if err != nil {
w.log.Err(ctx, kerrors.WithMsg(err, "Error subscribing"))
if err := ktime.After(ctx, delay); err != nil {
continue
}
delay = min(delay*2, opts.MaxBackoff)
continue
}
w.consume(ctx, sub, opts)
delay = opts.MinBackoff
}
}

func (w *Watcher) consume(ctx context.Context, sub Subscription, opts WatchOpts) {
defer func() {
if err := sub.Close(ctx); err != nil {
w.log.Err(ctx, kerrors.WithMsg(err, "Error closing watched subscription"))
}
}()

delay := opts.MinBackoff
for {
select {
case <-ctx.Done():
return
default:
}
m, err := sub.ReadMsg(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
continue
}
if errors.Is(err, ErrClientClosed) {
return
}
w.log.Err(ctx, kerrors.WithMsg(err, "Failed reading message"))
if err := ktime.After(ctx, delay); err != nil {
return
}
defer func() {
if err := sub.Close(ctx); err != nil {
w.log.Err(ctx, kerrors.WithMsg(err, "Error closing watched subscription"))
}
}()
delay = watchStartDelay

for {
m, err := sub.ReadMsg(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
continue
}
if errors.Is(err, ErrClientClosed) {
return
}
w.log.Err(ctx, kerrors.WithMsg(err, "Failed reading message"))
if err := ktime.After(ctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
continue
}

count := 1
for {
msgctx := klog.CtxWithAttrs(ctx,
klog.AString("events.topic", m.Topic),
klog.AInt("events.partition", m.Partition),
klog.AInt("events.offset", m.Offset),
klog.AString("events.time", m.Time.UTC().Format(time.RFC3339Nano)),
klog.AInt("events.delivered", count),
klog.AString("events.lreqid", w.lreqID()),
)
if w.dlqhandler != nil && count > w.maxdeliver {
count++
start := time.Now()
if err := w.dlqhandler.Handle(msgctx, *m); err != nil {
duration := time.Since(start)
w.log.Err(msgctx, kerrors.WithMsg(err, "Failed executing dlq handler"),
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if err := ktime.After(msgctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
continue
}
duration := time.Since(start)
w.log.Info(msgctx, "DLQ handled message",
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if err := sub.Commit(msgctx, *m); err != nil {
w.log.Err(msgctx, kerrors.WithMsg(err, "Failed to commit message"))
if errors.Is(err, ErrClientClosed) {
return
}
if errors.Is(err, ErrPartitionUnassigned) || errors.Is(err, ErrInvalidMsg) {
break
}
if err := ktime.After(msgctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
continue
}
w.log.Info(msgctx, "Committed message")
} else {
count++
start := time.Now()
if err := w.handler.Handle(msgctx, *m); err != nil {
duration := time.Since(start)
w.log.Err(msgctx, kerrors.WithMsg(err, "Failed executing subscription handler"),
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if err := ktime.After(msgctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
continue
}
duration := time.Since(start)
w.log.Info(msgctx, "Handled subscription message",
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if err := sub.Commit(msgctx, *m); err != nil {
w.log.Err(msgctx, kerrors.WithMsg(err, "Failed to commit message"))
if errors.Is(err, ErrClientClosed) {
return
}
if errors.Is(err, ErrPartitionUnassigned) || errors.Is(err, ErrInvalidMsg) {
break
}
if err := ktime.After(msgctx, delay); err != nil {
return
}
delay = min(delay*2, opts.MaxBackoff)
continue
}
w.log.Info(msgctx, "Committed message")
break
}
}
delay = watchStartDelay
delay = min(delay*2, opts.MaxBackoff)
continue
}
if !sub.IsAssigned(*m) {
continue
}
w.consumeMsg(ctx, sub, *m, opts)
delay = opts.MinBackoff
}
}

func (w *Watcher) consumeMsg(ctx context.Context, sub Subscription, m Msg, opts WatchOpts) {
ctx = klog.CtxWithAttrs(ctx,
klog.AString("events.topic", m.Topic),
klog.AInt("events.partition", m.Partition),
klog.AInt("events.offset", m.Offset),
klog.AInt64("events.time_us", m.Time.UnixMicro()),
klog.AString("events.time", m.Time.UTC().Format(time.RFC3339Nano)),
klog.AString("events.lreqid", w.lreqID()),
)

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
case <-sub.MsgUnassigned(m):
cancel(ErrPartitionUnassigned)
}
}()

delay := opts.MinBackoff
count := 0
for {
select {
case <-ctx.Done():
return
default:
}
count++
isDlq := w.dlqhandler != nil && count > w.maxdeliver
var handler Handler
if isDlq {
handler = w.dlqhandler
} else {
handler = w.handler
}

start := time.Now()
if err := w.handleMsg(ctx, sub, handler, m, start); err != nil {
duration := time.Since(start)
w.log.Err(ctx, err,
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if errors.Is(err, ErrClientClosed) {
return
}
if errors.Is(context.Cause(ctx), ErrPartitionUnassigned) || errors.Is(err, ErrPartitionUnassigned) || errors.Is(err, ErrInvalidMsg) {
return
}
if err := ktime.After(ctx, delay); err != nil {
return
}
}()
delay = min(delay*2, opts.MaxBackoff)
continue
}
return
}
}

func (w *Watcher) handleMsg(ctx context.Context, sub Subscription, handler Handler, m Msg, start time.Time) error {
if err := handler.Handle(ctx, m); err != nil {
return kerrors.WithMsg(err, "Failed executing handler")
}
duration := time.Since(start)
w.log.Info(ctx, "Handled message",
klog.AInt64("duration_ms", duration.Milliseconds()),
)
if err := sub.Commit(ctx, m); err != nil {
return kerrors.WithMsg(err, "Failed to commit message")
}
w.log.Info(ctx, "Committed message")
return nil
}
Loading

0 comments on commit 6f369e4

Please sign in to comment.