Skip to content

Commit

Permalink
move close channle behind producer state changed
Browse files Browse the repository at this point in the history
  • Loading branch information
graysonzeng committed Jul 19, 2023
1 parent dbf81be commit f46c663
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,12 +1266,12 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)

func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)
defer close(p.dataChan)
defer close(p.cmdChan)
if !p.casProducerState(producerReady, producerClosing) {
return
}

defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")
for {
if len(p.dataChan) == 0 {
Expand Down

0 comments on commit f46c663

Please sign in to comment.