Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][producer] Fail all messages that are pending requests when closing like java #1059

Merged
merged 9 commits into from
Aug 30, 2023
60 changes: 60 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
return
}

defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
Expand All @@ -1304,6 +1306,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages()

if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
Expand All @@ -1316,6 +1319,63 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.batchFlushTicker.Stop()
}

func (p *partitionProducer) failPendingMessages() {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
return
}
p.log.Infof("Failing %d messages on closing producer", viewSize)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)

// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
return m != nil
})

if item == nil {
return
}

pi := item.(*pendingItem)
pi.Lock()

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.log.WithError(errProducerClosed).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}

if sr.callback != nil {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, nil, sr.msg, errProducerClosed)
})
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}

// flag the sending has completed with error, flush make no effect
pi.Complete(errProducerClosed)
pi.Unlock()

// finally reached the last view item, current iteration ends
if pi == lastViewItem {
p.log.Infof("%d messages complete failed", viewSize)
return
}
}
}

func (p *partitionProducer) LastSequenceID() int64 {
return atomic.LoadInt64(&p.lastSequenceID)
}
Expand Down
30 changes: 30 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,3 +2268,33 @@ func TestProducerSendWithContext(t *testing.T) {
// producer.Send should fail and return err context.Canceled
assert.True(t, errors.Is(err, context.Canceled))
}

func TestFailPendingMessageWithClose(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBlockIfQueueFull: false,
BatchingMaxPublishDelay: 100000,
BatchingMaxMessages: 1000,
})

assert.NoError(t, err)
assert.NotNil(t, testProducer)
for i := 0; i < 3; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
assert.Equal(t, errProducerClosed, e)
}
})
}
partitionProducerImp := testProducer.(*producer).producers[0].(*partitionProducer)
partitionProducerImp.pendingQueue.Put(&pendingItem{})
testProducer.Close()
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}