Skip to content

Commit

Permalink
Extract ringbuf
Browse files Browse the repository at this point in the history
  • Loading branch information
xorkevin committed Oct 1, 2023
1 parent fe6ff5f commit 40a5db4
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 73 deletions.
80 changes: 7 additions & 73 deletions service/events/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,14 @@ package events

import (
"context"
"errors"
"fmt"
"sync"
"time"

"xorkevin.dev/governor/util/ringbuf"
"xorkevin.dev/kerrors"
)

type (
ringBuffer struct {
buf []Msg
r int
w int
}
)

func (b *ringBuffer) resize() {
next := make([]Msg, len(b.buf)*2)
if b.r == b.w {
b.w = 0
} else if b.r < b.w {
b.w = copy(next, b.buf[b.r:b.w])
} else {
p := copy(next, b.buf[b.r:])
q := 0
if b.w > 0 {
q = copy(next[p:], b.buf[:b.w])
}
b.w = p + q
}
b.buf = next
b.r = 0
}

func (b *ringBuffer) Write(m Msg) {
next := (b.w + 1) % len(b.buf)
if next == b.r {
b.resize()
b.Write(m)
return
}
b.buf[b.w] = m
b.w = next
}

func (b *ringBuffer) Read() (*Msg, error) {
if b.r == b.w {
return nil, kerrors.WithKind(nil, ErrReadEmpty, "No messages")
}
next := (b.r + 1) % len(b.buf)
m := b.buf[b.r]
b.r = next
return &m, nil
}

func (b *ringBuffer) Peek() (*Msg, error) {
if b.r == b.w {
return nil, kerrors.WithKind(nil, ErrReadEmpty, "No messages")
}
m := b.buf[b.r]
return &m, nil
}

type (
MuxChan struct {
topics map[string]*chanTopic
Expand All @@ -77,7 +22,7 @@ type (
}

chanGroup struct {
ring *ringBuffer
ring *ringbuf.Ring[Msg]
sub *chanSubscription
subs map[*chanSubscription]struct{}
}
Expand Down Expand Up @@ -111,11 +56,7 @@ func (s *MuxChan) Subscribe(ctx context.Context, topic, group string, opts Consu
g, ok := t.groups[group]
if !ok {
g = &chanGroup{
ring: &ringBuffer{
buf: make([]Msg, 2),
r: 0,
w: 0,
},
ring: ringbuf.New[Msg](),
sub: sub,
subs: map[*chanSubscription]struct{}{},
}
Expand Down Expand Up @@ -269,12 +210,8 @@ func (s *chanSubscription) ReadMsg(ctx context.Context) (*Msg, error) {
continue
}

m, err := g.ring.Peek()
if err != nil {
if !errors.Is(err, ErrReadEmpty) {
rerr = err
return
}
m, ok := g.ring.Peek()
if !ok {
s.rCond.Wait()
continue
}
Expand Down Expand Up @@ -351,11 +288,8 @@ func (s *chanSubscription) Commit(ctx context.Context, msg Msg) error {
return kerrors.WithKind(nil, ErrPartitionUnassigned, "Unassigned partition")
}

m, err := g.ring.Peek()
if err != nil {
if !errors.Is(err, ErrReadEmpty) {
return err
}
m, ok := g.ring.Peek()
if !ok {
return nil
}
if m.Offset != msg.Offset {
Expand Down
64 changes: 64 additions & 0 deletions util/ringbuf/ringbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ringbuf

type (
Ring[T any] struct {
buf []T
r int
w int
}
)

func New[T any]() *Ring[T] {
return &Ring[T]{
buf: make([]T, 2),
r: 0,
w: 0,
}
}

func (b *Ring[T]) resize() {
next := make([]T, len(b.buf)*2)
if b.r == b.w {
b.w = 0
} else if b.r < b.w {
b.w = copy(next, b.buf[b.r:b.w])
} else {
p := copy(next, b.buf[b.r:])
q := 0
if b.w > 0 {
q = copy(next[p:], b.buf[:b.w])
}
b.w = p + q
}
b.buf = next
b.r = 0
}

func (b *Ring[T]) Write(m T) {
next := (b.w + 1) % len(b.buf)
if next == b.r {
b.resize()
b.Write(m)
return
}
b.buf[b.w] = m
b.w = next
}

func (b *Ring[T]) Read() (*T, bool) {
if b.r == b.w {
return nil, false
}
next := (b.r + 1) % len(b.buf)
m := b.buf[b.r]
b.r = next
return &m, true
}

func (b *Ring[T]) Peek() (*T, bool) {
if b.r == b.w {
return nil, false
}
m := b.buf[b.r]
return &m, true
}

0 comments on commit 40a5db4

Please sign in to comment.