You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Get the right uncompressedPayloadSize from msg.Payload or msg.Schema;
Release the memory of current failed chunk:
// the permit of first chunk has acquiredifchunkID!=0&&!p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize-int64(lhs))
return
}
Actual behavior
if we use schema, the msg.Payload should be nil and we request 0 memory at the beginning partitionProducer.failTimeoutMessages()(pulsar/producer_partition.go:480/492):
uncompressedPayload:=msg.Payload//HOW ABOUT WE USE SCHEMA ???uncompressedPayloadSize:=int64(len(uncompressedPayload))
varschemaPayload []bytevarerrerrorifmsg.Value!=nil&&msg.Payload!=nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}
// The block chan must be closed when returned with exceptiondeferrequest.stopBlock()
if!p.canAddToQueue(request, uncompressedPayloadSize) {
return
}
in partitionProducer.internalSend()(pulsar/producer_partition.go:663), when chunking is enable, we send each chunk one by one, when a chunk canAddToQueue failed, we release the memory that have not been sent. When we look at the code below, it seems forget to release the memory of current failed chunk, 'cause lhs and rhs have been updated to lhs = chunkID * payloadChunkSize and rhs = lhs + payloadChunkSize, if we want to release the memory of the current failed chunk and the later ones, I think we should p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs)). The java client is client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
// the permit of first chunk has acquiredifchunkID!=0&&!p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize-int64(rhs))//SHOULD BE lhs ???return
}
And, if msg.Payload is nil, uncompressedPayloadSize should be 0, and we will release a negative amount of memory, the memLimit will be in a wrong state some time.
when a chunk failed, we release the memory haven't been sent, and the original meesage will be failed finally, but when in partitionProducer.failTimeoutMessages()(pulsar/producer_partition.go:978) we release the memory of the size of msg.Payload, it seems we release too much here, 'cause the memory of the failed chunks has been release before.
size:=len(sr.msg.Payload)//HOW ABOUT WE USE SCHEMA ???p.releaseSemaphoreAndMem(int64(size))
In partitionProducer.ReceivedSendReceipt() we release memory by p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))), if a message is chunked, there will many sendRequest point to that message, it seems we will release more than we reqeusted.
func (p*partitionProducer) internalSend(request*sendRequest) {
p.log.Debug("Received send request: ", *request.msg)
msg:=request.msg// read payload from messageuncompressedPayload:=msg.PayloaduncompressedPayloadSize:=int64(len(uncompressedPayload))
varschemaPayload []bytevarerrerrorifmsg.Value!=nil&&msg.Payload!=nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}
// The block chan must be closed when returned with exceptiondeferrequest.stopBlock()
if!p.canAddToQueue(request, uncompressedPayloadSize) {
return
}
ifp.options.DisableMultiSchema {
ifmsg.Schema!=nil&&p.options.Schema!=nil&&msg.Schema.GetSchemaInfo().hash() !=p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
varschemaSchemavarschemaVersion []byteifmsg.Schema!=nil {
schema=msg.Schema
} elseifp.options.Schema!=nil {
schema=p.options.Schema
}
ifmsg.Value!=nil {
// payload and schema are mutually exclusive// try to get payload from schema value only if payload is not setifuncompressedPayload==nil&&schema!=nil {
schemaPayload, err=schema.Encode(msg.Value)
iferr!=nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
}
}
ifuncompressedPayload==nil {
uncompressedPayload=schemaPayload
}
ifschema!=nil {
schemaVersion=p.schemaCache.Get(schema.GetSchemaInfo())
ifschemaVersion==nil {
schemaVersion, err=p.getOrCreateSchema(schema.GetSchemaInfo())
iferr!=nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
}
uncompressedSize:=len(uncompressedPayload)
deliverAt:=msg.DeliverAtifmsg.DeliverAfter.Nanoseconds() >0 {
deliverAt=time.Now().Add(msg.DeliverAfter)
}
mm:=p.genMetadata(msg, uncompressedSize, deliverAt)
// set default ReplicationClusters when DisableReplicationifmsg.DisableReplication {
msg.ReplicationClusters= []string{"__local__"}
}
sendAsBatch:=!p.options.DisableBatching&&msg.ReplicationClusters==nil&&deliverAt.UnixNano() <0// Once the batching is enabled, it can close blockCh early to make block finishifsendAsBatch {
request.stopBlock()
} else {
// update sequence id for metadata, make the size of msgMetadata more accurate// batch sending will update sequence ID in the BatchBuilderp.updateMetadataSeqID(mm, msg)
}
maxMessageSize:=int(p._getConn().GetMaxMessageSize())
// compress payload if not batchingvarcompressedPayload []bytevarcompressedSizeintvarcheckSizeintif!sendAsBatch {
compressedPayload=p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize=len(compressedPayload)
checkSize=compressedSize// set the compress type in msgMetaDatacompressionType:=pb.CompressionType(p.options.CompressionType)
ifcompressionType!=pb.CompressionType_NONE {
mm.Compression=&compressionType
}
} else {
// final check for batching message is in serializeMessage// this is a double checkcheckSize=uncompressedSize
}
// if msg is too large and chunking is disabledifcheckSize>maxMessageSize&&!p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
vartotalChunksint// max chunk payload sizevarpayloadChunkSizeintifsendAsBatch||!p.options.EnableChunking {
totalChunks=1payloadChunkSize=int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize=int(p._getConn().GetMaxMessageSize()) -proto.Size(mm)
ifpayloadChunkSize<=0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
// set ChunkMaxMessageSizeifp.options.ChunkMaxMessageSize!=0 {
payloadChunkSize=int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks=int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
// set total chunks to send requestrequest.totalChunks=totalChunksif!sendAsBatch {
iftotalChunks>1 {
varlhs, rhsintuuid:=fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
mm.Uuid=proto.String(uuid)
mm.NumChunksFromMsg=proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize=proto.Int32(int32(compressedSize))
cr:=newChunkRecorder()
forchunkID:=0; chunkID<totalChunks; chunkID++ {
lhs=chunkID*payloadChunkSizeifrhs=lhs+payloadChunkSize; rhs>compressedSize {
rhs=compressedSize
}
// update chunk idmm.ChunkId=proto.Int32(int32(chunkID))
nsr:=&sendRequest{
ctx: request.ctx,
msg: request.msg,
callback: request.callback,
callbackOnce: request.callbackOnce,
publishTime: request.publishTime,
blockCh: request.blockCh,
closeBlockChOnce: request.closeBlockChOnce,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
}
// the permit of first chunk has acquiredifchunkID!=0&&!p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize-int64(rhs))//BUG, SHOULD BE lhsreturn
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
}
// close the blockCh when all the chunks acquired permitsrequest.stopBlock()
} else {
// close the blockCh when totalChunks is 1 (it has acquired permits)request.stopBlock()
p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
}
} else {
smm:=p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled:=!p.options.DisableMultiSchemaadded:=addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled)
if!added {
// The current batch is full. flush it and retryp.internalFlushCurrentBatch()
// after flushing try again to add the current payloadifok:=addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
}
ifrequest.flushImmediately {
p.internalFlushCurrentBatch()
}
}
}
func (p*partitionProducer) failTimeoutMessages() {
diff:=func(sentAt time.Time) time.Duration {
returnp.options.SendTimeout-time.Since(sentAt)
}
t:=time.NewTimer(p.options.SendTimeout)
defert.Stop()
forranget.C {
state:=p.getProducerState()
ifstate==producerClosing||state==producerClosed {
return
}
item:=p.pendingQueue.Peek()
ifitem==nil {
// pending queue is emptyt.Reset(p.options.SendTimeout)
continue
}
oldestItem:=item.(*pendingItem)
ifnextWaiting:=diff(oldestItem.sentAt); nextWaiting>0 {
// none of these pending messages have timed out, wait and retryt.Reset(nextWaiting)
continue
}
// since pending queue is not thread safe because of there is no global iteration lock// to control poll from pending queue, current goroutine and connection receipt handler// iterate pending queue at the same time, this maybe a performance trade-off// see https://github.com/apache/pulsar-client-go/pull/301curViewItems:=p.pendingQueue.ReadableSlice()
viewSize:=len(curViewItems)
ifviewSize<=0 {
// double checkt.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
lastViewItem:=curViewItems[viewSize-1].(*pendingItem)
// iterate at most viewSize itemsfori:=0; i<viewSize; i++ {
tickerNeedWaiting:=time.Duration(0)
item:=p.pendingQueue.CompareAndPoll(
func(minterface{}) bool {
ifm==nil {
returnfalse
}
pi:=m.(*pendingItem)
pi.Lock()
deferpi.Unlock()
ifnextWaiting:=diff(pi.sentAt); nextWaiting>0 {
// current and subsequent items not timeout yet, stop iteratingtickerNeedWaiting=nextWaitingreturnfalse
}
returntrue
})
ifitem==nil {
t.Reset(p.options.SendTimeout)
break
}
iftickerNeedWaiting>0 {
t.Reset(tickerNeedWaiting)
break
}
pi:=item.(*pendingItem)
pi.Lock()
for_, i:=rangepi.sendRequests {
sr:=i.(*sendRequest)
ifsr.msg!=nil {
size:=len(sr.msg.Payload)
p.releaseSemaphoreAndMem(int64(size))
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}
ifsr.callback!=nil {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, nil, sr.msg, errSendTimeout)
})
}
ifsr.transaction!=nil {
sr.transaction.endSendOrAckOp(nil)
}
}
// flag the sending has completed with error, flush make no effectpi.Complete()
pi.Unlock()
// finally reached the last view item, current iteration endsifpi==lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
}
}
func (p*partitionProducer) ReceivedSendReceipt(response*pb.CommandSendReceipt) {
pi, ok:=p.pendingQueue.Peek().(*pendingItem)
if!ok {
// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
return
}
ifpi.sequenceID<response.GetSequenceId() {
// Force connection closing so that messages can be re-transmitted in a new connectionp.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
} elseifpi.sequenceID>response.GetSequenceId() {
// Ignoring the ack since it's referring to a message that has already timed out.p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
return
} else {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callbackp.pendingQueue.Poll()
now:=time.Now().UnixNano()
// lock the pending item while sending the requestspi.Lock()
deferpi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) /1.0e9)
batchSize:=int32(0)
for_, i:=rangepi.sendRequests {
sr:=i.(*sendRequest)
ifsr.msg!=nil {
batchSize=batchSize+1
} else { // Flush requestbreak
}
}
foridx, i:=rangepi.sendRequests {
sr:=i.(*sendRequest)
ifsr.msg!=nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) /1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize:=float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
}
ifsr.callback!=nil||len(p.options.Interceptors) >0 {
msgID:=newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
batchSize,
)
ifsr.totalChunks>1 {
ifsr.chunkID==0 {
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
} elseifsr.chunkID==sr.totalChunks-1 {
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
// use chunkMsgID to set msgIDmsgID=&sr.chunkRecorder.chunkedMsgID
}
}
ifsr.totalChunks<=1||sr.chunkID==sr.totalChunks-1 {
runCallback(sr.callback, msgID, sr.msg, nil)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
ifsr.transaction!=nil {
sr.transaction.endSendOrAckOp(nil)
}
}
// Mark this pending item as donepi.Complete()
}
}
if we want to release the memory of the current failed chunk and the later ones, I think we should p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs)).
I think so. But do we need to release mem when canAddToQueue has failed? I think just return will be fine. There should be more test code to verify producer memory limit in chunking.
Anyway, we need to add more test code for producer memory limit to cover chunking and schema. If you have time, we can fix these problems together.
RobertIndie
changed the title
[Bug][Producer]Memory release bug???
[Bug][Producer] Inaccurate producer memory limit issue in chunking and schema
Jul 13, 2023
Expected behavior
msg.Payload
ormsg.Schema
;Actual behavior
msg.Payload
should be nil and we request 0 memory at the beginningpartitionProducer.failTimeoutMessages()
(pulsar/producer_partition.go:480/492):partitionProducer.internalSend()
(pulsar/producer_partition.go:663), when chunking is enable, we send each chunk one by one, when a chunkcanAddToQueue
failed, we release the memory that have not been sent. When we look at the code below, it seems forget to release the memory of current failed chunk, 'causelhs
andrhs
have been updated tolhs = chunkID * payloadChunkSize
andrhs = lhs + payloadChunkSize
, if we want to release the memory of the current failed chunk and the later ones, I think we shouldp.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs))
. The java client isclient.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
And, if
msg.Payload
is nil, uncompressedPayloadSize should be 0, and we will release a negative amount of memory, the memLimit will be in a wrong state some time.partitionProducer.failTimeoutMessages()
(pulsar/producer_partition.go:978) we release the memory of the size ofmsg.Payload
, it seems we release too much here, 'cause the memory of the failed chunks has been release before.partitionProducer.ReceivedSendReceipt()
we release memory byp.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
, if a message is chunked, there will manysendRequest
point to that message, it seems we will release more than we reqeusted.Steps to reproduce
How can we reproduce the issue
Code review
System configuration
Pulsar version: x.y
@RobertIndie @shibd
The text was updated successfully, but these errors were encountered: