Skip to content

Commit

Permalink
fix(producer): fail all messages that are pending requests when closi…
Browse files Browse the repository at this point in the history
…ng (#1059)

This aligns the manner with Java client.
  • Loading branch information
graysonzeng authored Aug 30, 2023
1 parent fbee610 commit 4109351
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
60 changes: 60 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
return
}

defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
Expand All @@ -1304,6 +1306,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages()

if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
Expand All @@ -1316,6 +1319,63 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.batchFlushTicker.Stop()
}

func (p *partitionProducer) failPendingMessages() {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
return
}
p.log.Infof("Failing %d messages on closing producer", viewSize)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)

// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
return m != nil
})

if item == nil {
return
}

pi := item.(*pendingItem)
pi.Lock()

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.log.WithError(errProducerClosed).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}

if sr.callback != nil {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, nil, sr.msg, errProducerClosed)
})
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}

// flag the sending has completed with error, flush make no effect
pi.Complete(errProducerClosed)
pi.Unlock()

// finally reached the last view item, current iteration ends
if pi == lastViewItem {
p.log.Infof("%d messages complete failed", viewSize)
return
}
}
}

func (p *partitionProducer) LastSequenceID() int64 {
return atomic.LoadInt64(&p.lastSequenceID)
}
Expand Down
30 changes: 30 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,3 +2268,33 @@ func TestProducerSendWithContext(t *testing.T) {
// producer.Send should fail and return err context.Canceled
assert.True(t, errors.Is(err, context.Canceled))
}

func TestFailPendingMessageWithClose(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBlockIfQueueFull: false,
BatchingMaxPublishDelay: 100000,
BatchingMaxMessages: 1000,
})

assert.NoError(t, err)
assert.NotNil(t, testProducer)
for i := 0; i < 3; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
assert.Equal(t, errProducerClosed, e)
}
})
}
partitionProducerImp := testProducer.(*producer).producers[0].(*partitionProducer)
partitionProducerImp.pendingQueue.Put(&pendingItem{})
testProducer.Close()
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}

0 comments on commit 4109351

Please sign in to comment.