Skip to content

Commit

Permalink
tester: queue consumer drops events once the events-channel gets full
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Aug 12, 2019
1 parent bc52dc0 commit c12b6f3
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion tester/queueconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
killed
)

const (
eventBufferQueueSize = 100000
)

type queueConsumer struct {
queue *queue
nextOffset int64
Expand All @@ -40,7 +44,7 @@ type queueConsumer struct {
func newQueueConsumer(topic string, queue *queue) *queueConsumer {
qc := &queueConsumer{
queue: queue,
eventBuffer: make(chan kafka.Event, 100000),
eventBuffer: make(chan kafka.Event, eventBufferQueueSize),
state: NewSignal(unbound, bound, stopped, stopping, running, killed).SetState(unbound),
}
return qc
Expand Down Expand Up @@ -165,6 +169,11 @@ func (qc *queueConsumer) addToBuffer(event kafka.Event) {
qc.waitEventBuffer.Add(1)

qc.eventBuffer <- event

if len(qc.eventBuffer) > eventBufferQueueSize*0.9 {
logger.Printf("buffer nearly full: %d, %s. Will drop events.", len(qc.eventBuffer), qc.queue.topic)
<-qc.eventBuffer
}
}

func (qc *queueConsumer) startSimpleConsumer(offset int64, firstStart bool) {
Expand Down

0 comments on commit c12b6f3

Please sign in to comment.