Skip to content

Commit

Permalink
add sendRequest.done() to release resource together
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Oct 24, 2023
1 parent f5667b2 commit 57209ec
Showing 1 changed file with 77 additions and 14 deletions.
91 changes: 77 additions & 14 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,20 +1420,35 @@ func (p *partitionProducer) Close() {
}

type sendRequest struct {
ctx context.Context
msg *ProducerMessage
callback func(MessageID, *ProducerMessage, error)
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
blockCh chan struct{}
closeBlockChOnce *sync.Once
totalChunks int
chunkID int
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
ctx context.Context
msg *ProducerMessage
callback func(MessageID, *ProducerMessage, error)
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
blockCh chan struct{}
closeBlockChOnce *sync.Once
totalChunks int
chunkID int
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
producer *partitionProducer
memLimit internal.MemoryLimitController
semaphore internal.Semaphore
reservedSemaphore int
sendAsBatch bool
schema Schema
schemaVersion []byte
uncompressedPayload []byte
uncompressedSize int64
compressedPayload []byte
compressedSize int
payloadChunkSize int
mm *pb.MessageMetadata
deliverAt time.Time
maxMessageSize int32
}

// stopBlock can be invoked multiple times safety
Expand All @@ -1443,6 +1458,54 @@ func (sr *sendRequest) stopBlock() {
})
}

func (sr *sendRequest) done(msgID MessageID, err error) {
if err == nil {
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
sr.producer.metrics.MessagesPublished.Inc()
sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
}

if err != nil {
sr.producer.log.WithError(err).
WithField("size", sr.reservedMem).
WithField("properties", sr.msg.Properties)
}

if err == errSendTimeout {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}

if err == errMessageTooLarge {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}

if sr.semaphore != nil {
for i := 0; i < sr.reservedSemaphore; i++ {
sr.semaphore.Release()
}
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
}

if sr.memLimit != nil {
sr.memLimit.ReleaseMemory(sr.reservedMem)
sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
}

if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, msgID, sr.msg, err)
})

if sr.transaction != nil {
sr.transaction.endSendOrAckOp(err)
}

if sr.producer.options.Interceptors != nil && err == nil {
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
}
}
}

type closeProducer struct {
doneCh chan struct{}
}
Expand Down

0 comments on commit 57209ec

Please sign in to comment.