diff --git a/tester/queueconsumer.go b/tester/queueconsumer.go index 76428189..dc7c080f 100644 --- a/tester/queueconsumer.go +++ b/tester/queueconsumer.go @@ -27,6 +27,10 @@ const ( killed ) +const ( + eventBufferQueueSize = 100000 +) + type queueConsumer struct { queue *queue nextOffset int64 @@ -40,7 +44,7 @@ type queueConsumer struct { func newQueueConsumer(topic string, queue *queue) *queueConsumer { qc := &queueConsumer{ queue: queue, - eventBuffer: make(chan kafka.Event, 100000), + eventBuffer: make(chan kafka.Event, eventBufferQueueSize), state: NewSignal(unbound, bound, stopped, stopping, running, killed).SetState(unbound), } return qc @@ -165,6 +169,11 @@ func (qc *queueConsumer) addToBuffer(event kafka.Event) { qc.waitEventBuffer.Add(1) qc.eventBuffer <- event + + if len(qc.eventBuffer) > eventBufferQueueSize*0.9 { + logger.Printf("buffer nearly full: %d, %s. Will drop events.", len(qc.eventBuffer), qc.queue.topic) + <-qc.eventBuffer + } } func (qc *queueConsumer) startSimpleConsumer(offset int64, firstStart bool) {