Skip to content

Commit

Permalink
locate bug
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Oct 24, 2023
1 parent 5810654 commit 5a8f383
Showing 1 changed file with 56 additions and 55 deletions.
111 changes: 56 additions & 55 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,33 +483,48 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {

msg := sr.msg

// read payload from message
uncompressedPayload := msg.Payload

if !p.canAddToQueue(sr) {
return
}

uncompressedSize := sr.uncompressedSize
if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

uncompressedSize := len(sr.uncompressedPayload)

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(sr, uncompressedSize) {
if !p.canReserveMem(sr, int64(uncompressedSize)) {
return
}

p.updateMetaData(sr)
if err := p.updateChunkInfo(sr); err != nil {
return
}

if sr.sendAsBatch {
smm := p.genSingleMessageMetadataInBatch(msg, int(uncompressedSize))
smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(
smm, p, uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry

p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, sr, msg,
sr.deliverAt, sr.schemaVersion, multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedSize)
if ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(sr.callback, nil, sr.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Expand All @@ -536,8 +551,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
cr := newChunkRecorder()
for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
lhs = chunkID * sr.payloadChunkSize
rhs = lhs + sr.payloadChunkSize
if rhs > sr.compressedSize {
if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
rhs = sr.compressedSize
}
// update chunk id
Expand All @@ -548,6 +562,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
flushImmediately: sr.flushImmediately,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
Expand All @@ -568,10 +583,9 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(uncompressedSize - int64(lhs))
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}

p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize))
}
}
Expand Down Expand Up @@ -1142,36 +1156,43 @@ func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {

// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(sr.uncompressedSize)
runCallback(sr.callback, nil, sr.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)

p.metrics.PublishErrorsMsgTooLarge.Inc()
return errMessageTooLarge
}

var totalChunks int
// max chunk payload size
var payloadChunkSize int
if sr.sendAsBatch || !p.options.EnableChunking {
sr.totalChunks = 1
sr.payloadChunkSize = int(sr.maxMessageSize)
return nil
}

sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)

return errMetaTooLarge
}

// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
totalChunks = 1
payloadChunkSize = int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(sr.mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(sr.uncompressedSize)
runCallback(sr.callback, nil, sr.msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return errMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(payloadChunkSize))))
}

sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
// set total chunks to send request
sr.totalChunks = totalChunks
return nil
}

Expand Down Expand Up @@ -1208,26 +1229,6 @@ func (p *partitionProducer) internalSendAsync(

p.options.Interceptors.BeforeSend(p, msg)

if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

p.updateMetaData(sr)

if err := p.updateChunkInfo(sr); err != nil {
p.log.Error(err)
runCallback(sr.callback, nil, msg, err)
return
}

p.dataChan <- sr
}

Expand Down

0 comments on commit 5a8f383

Please sign in to comment.