Skip to content

Commit

Permalink
[Improve][Producer] simplify the flush logic (#1049)
Browse files Browse the repository at this point in the history
### Motivation

Simplify the producer flush logic

### Modifications
1. add a callback field to the pendingItem, default is nil;
2. in partitionProducer.internalFlush() get the last pendingItem from pendingQueue;
3. update the last pendingItem by setup a new callback;
4. in partitionProducer.ReceivedSendReceipt, no need to identify the sendRequest by checking if the msg is nil;
5. in pendingItem.Complete(), invoke its callback to notify the flush is done.


---------

Co-authored-by: gunli <[email protected]>
  • Loading branch information
gunli and gunli authored Jul 18, 2023
1 parent e45122c commit 3812c07
Showing 1 changed file with 24 additions and 35 deletions.
59 changes: 24 additions & 35 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,11 +848,12 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,

type pendingItem struct {
sync.Mutex
buffer internal.Buffer
sequenceID uint64
sentAt time.Time
sendRequests []interface{}
completed bool
buffer internal.Buffer
sequenceID uint64
sentAt time.Time
sendRequests []interface{}
completed bool
flushCallback func(err error)
}

func (p *partitionProducer) internalFlushCurrentBatch() {
Expand Down Expand Up @@ -990,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}

// flag the sending has completed with error, flush make no effect
pi.Complete()
pi.Complete(errSendTimeout)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -1062,15 +1063,10 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
return
}

sendReq := &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
fr.err = e
close(fr.doneCh)
},
pi.flushCallback = func(err error) {
fr.err = err
close(fr.doneCh)
}

pi.sendRequests = append(pi.sendRequests, sendReq)
}

func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
Expand Down Expand Up @@ -1208,27 +1204,17 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(0)
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
batchSize = batchSize + 1
} else { // Flush request
break
}
}
batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
}
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)

if sr.callback != nil || len(p.options.Interceptors) > 0 {
msgID := newMessageID(
Expand Down Expand Up @@ -1274,7 +1260,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}

// Mark this pending item as done
pi.Complete()
pi.Complete(nil)
}
}

Expand Down Expand Up @@ -1384,12 +1370,15 @@ type flushRequest struct {
err error
}

func (i *pendingItem) Complete() {
func (i *pendingItem) Complete(err error) {
if i.completed {
return
}
i.completed = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
}
}

// _setConn sets the internal connection field of this partition producer atomically.
Expand Down

0 comments on commit 3812c07

Please sign in to comment.