From c12b6f3f3fb57f9a4102cf8fb38f1483c1e71fcf Mon Sep 17 00:00:00 2001 From: frairon Date: Fri, 15 Mar 2019 13:55:14 +0100 Subject: [PATCH] tester: queue consumer drops events once the events-channel gets full --- tester/queueconsumer.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tester/queueconsumer.go b/tester/queueconsumer.go index 76428189..dc7c080f 100644 --- a/tester/queueconsumer.go +++ b/tester/queueconsumer.go @@ -27,6 +27,10 @@ const ( killed ) +const ( + eventBufferQueueSize = 100000 +) + type queueConsumer struct { queue *queue nextOffset int64 @@ -40,7 +44,7 @@ type queueConsumer struct { func newQueueConsumer(topic string, queue *queue) *queueConsumer { qc := &queueConsumer{ queue: queue, - eventBuffer: make(chan kafka.Event, 100000), + eventBuffer: make(chan kafka.Event, eventBufferQueueSize), state: NewSignal(unbound, bound, stopped, stopping, running, killed).SetState(unbound), } return qc @@ -165,6 +169,11 @@ func (qc *queueConsumer) addToBuffer(event kafka.Event) { qc.waitEventBuffer.Add(1) qc.eventBuffer <- event + + if len(qc.eventBuffer) > eventBufferQueueSize*0.9 { + logger.Printf("buffer nearly full: %d, %s. Will drop events.", len(qc.eventBuffer), qc.queue.topic) + <-qc.eventBuffer + } } func (qc *queueConsumer) startSimpleConsumer(offset int64, firstStart bool) {