You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
make pendingQueue of dynamic size (can be expanded at runtime) or with a bigger size than MaxPendingMessages ;
partitionProducer.canAddToQueue() do not check pendingQueue any more if it can be expanded at runtime;
call partitionProducer.canAddToQueue() in partitionProducer.internalSendAsync before putting the sendRequest into partitionProducer.dataChan;
delete sendRequest.blockCh, no blocking any more;
treat all the messages that have been put into partitionProducer.dataChan as pendingMessage, send and flush them when the producer is closed.
Now, we won't block every time, just block when MaxPendingMessages is reached, and no semantics confusion any more.
Actual behavior
Currently, we use a lot of measures to controll the MaxPendingMessages in the producer, partitionProducer.dataChan with MaxPendingMessages capacity, partitionProducer.publishSemaphore , sendRequest.blockCh, partitionProducer.canAddToQueue() and the complicated code in partitionProducer.internalSend(), these make the code difficult to understand and maintain. What is worse, is that when DisableBlockIfQueueFull is set to false, the partitionProducer.internalSendAsync() will block every time, event if the queue is NOT full, which will block the application level logic.
And, when chunking is enabled, a Message will be seperated into many chunks, correspondingly, many pendingItems in the pendingQueue, when the pendingQueue is full, it doesn't mean MaxPendingMessages is reached from the user‘s eye view, which is semantically confused.
if!p.options.DisableBlockIfQueueFull {
// block if queue full<-bc
}
Steps to reproduce
Review the code
func (p*partitionProducer) internalSendAsync(ctx context.Context, msg*ProducerMessage,
callbackfunc(MessageID, *ProducerMessage, error), flushImmediatelybool) {
//Register transaction operation to transaction and the transaction coordinator.varnewCallbackfunc(MessageID, *ProducerMessage, error)
ifmsg.Transaction!=nil {
transactionImpl:= (msg.Transaction).(*transaction)
iftransactionImpl.state!=TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message"+" by a non-open transaction.")
callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
return
}
iferr:=transactionImpl.registerProducerTopic(p.topic); err!=nil {
callback(nil, msg, err)
return
}
iferr:=transactionImpl.registerSendOrAckOp(); err!=nil {
callback(nil, msg, err)
}
newCallback=func(idMessageID, producerMessage*ProducerMessage, errerror) {
callback(id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback=callback
}
ifp.getProducerState() !=producerReady {
// Producer is closingnewCallback(nil, msg, errProducerClosed)
return
}
// bc only works when DisableBlockIfQueueFull is falsebc:=make(chanstruct{})
// callbackOnce make sure the callback is only invoked once in chunkingcallbackOnce:=&sync.Once{}
vartxn*transactionifmsg.Transaction!=nil {
txn= (msg.Transaction).(*transaction)
}
sr:=&sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
p.dataChan<-srif!p.options.DisableBlockIfQueueFull {
// block if queue full<-bc
}
}
func (p*partitionProducer) canAddToQueue(sr*sendRequest, uncompressedPayloadSizeint64) bool {
ifp.options.DisableBlockIfQueueFull {
if!p.publishSemaphore.TryAcquire() {
ifsr.callback!=nil {
sr.callback(nil, sr.msg, errSendQueueIsFull)
}
returnfalse
}
if!p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) {
p.publishSemaphore.Release()
ifsr.callback!=nil {
sr.callback(nil, sr.msg, errMemoryBufferIsFull)
}
returnfalse
}
} else {
if!p.publishSemaphore.Acquire(sr.ctx) {
sr.callback(nil, sr.msg, errContextExpired)
returnfalse
}
if!p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) {
p.publishSemaphore.Release()
sr.callback(nil, sr.msg, errContextExpired)
returnfalse
}
}
p.metrics.MessagesPending.Inc()
p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
returntrue
}
func (p*partitionProducer) internalSend(request*sendRequest) {
p.log.Debug("Received send request: ", *request.msg)
msg:=request.msg// read payload from messageuncompressedPayload:=msg.PayloaduncompressedPayloadSize:=int64(len(uncompressedPayload))
varschemaPayload []bytevarerrerrorifmsg.Value!=nil&&msg.Payload!=nil {
p.log.Error("Can not set Value and Payload both")
request.callback(nil, request.msg, errors.New("can not set Value and Payload both"))
return
}
// The block chan must be closed when returned with exceptiondeferrequest.stopBlock()
if!p.canAddToQueue(request, uncompressedPayloadSize) {
return
}
ifp.options.DisableMultiSchema {
ifmsg.Schema!=nil&&p.options.Schema!=nil&&msg.Schema.GetSchemaInfo().hash() !=p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
varschemaSchemavarschemaVersion []byteifmsg.Schema!=nil {
schema=msg.Schema
} elseifp.options.Schema!=nil {
schema=p.options.Schema
}
ifmsg.Value!=nil {
// payload and schema are mutually exclusive// try to get payload from schema value only if payload is not setifuncompressedPayload==nil&&schema!=nil {
schemaPayload, err=schema.Encode(msg.Value)
iferr!=nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
}
}
ifuncompressedPayload==nil {
uncompressedPayload=schemaPayload
}
ifschema!=nil {
schemaVersion=p.schemaCache.Get(schema.GetSchemaInfo())
ifschemaVersion==nil {
schemaVersion, err=p.getOrCreateSchema(schema.GetSchemaInfo())
iferr!=nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
}
uncompressedSize:=len(uncompressedPayload)
deliverAt:=msg.DeliverAtifmsg.DeliverAfter.Nanoseconds() >0 {
deliverAt=time.Now().Add(msg.DeliverAfter)
}
mm:=p.genMetadata(msg, uncompressedSize, deliverAt)
// set default ReplicationClusters when DisableReplicationifmsg.DisableReplication {
msg.ReplicationClusters= []string{"__local__"}
}
sendAsBatch:=!p.options.DisableBatching&&msg.ReplicationClusters==nil&&deliverAt.UnixNano() <0// Once the batching is enabled, it can close blockCh early to make block finishifsendAsBatch {
request.stopBlock()
} else {
// update sequence id for metadata, make the size of msgMetadata more accurate// batch sending will update sequence ID in the BatchBuilderp.updateMetadataSeqID(mm, msg)
}
maxMessageSize:=int(p._getConn().GetMaxMessageSize())
// compress payload if not batchingvarcompressedPayload []bytevarcompressedSizeintvarcheckSizeintif!sendAsBatch {
compressedPayload=p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize=len(compressedPayload)
checkSize=compressedSize// set the compress type in msgMetaDatacompressionType:=pb.CompressionType(p.options.CompressionType)
ifcompressionType!=pb.CompressionType_NONE {
mm.Compression=&compressionType
}
} else {
// final check for batching message is in serializeMessage// this is a double checkcheckSize=uncompressedSize
}
// if msg is too large and chunking is disabledifcheckSize>maxMessageSize&&!p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
vartotalChunksint// max chunk payload sizevarpayloadChunkSizeintifsendAsBatch||!p.options.EnableChunking {
totalChunks=1payloadChunkSize=int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize=int(p._getConn().GetMaxMessageSize()) -proto.Size(mm)
ifpayloadChunkSize<=0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
// set ChunkMaxMessageSizeifp.options.ChunkMaxMessageSize!=0 {
payloadChunkSize=int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks=int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
// set total chunks to send requestrequest.totalChunks=totalChunksif!sendAsBatch {
iftotalChunks>1 {
varlhs, rhsintuuid:=fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
mm.Uuid=proto.String(uuid)
mm.NumChunksFromMsg=proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize=proto.Int32(int32(compressedSize))
cr:=newChunkRecorder()
forchunkID:=0; chunkID<totalChunks; chunkID++ {
lhs=chunkID*payloadChunkSizeifrhs=lhs+payloadChunkSize; rhs>compressedSize {
rhs=compressedSize
}
// update chunk idmm.ChunkId=proto.Int32(int32(chunkID))
nsr:=&sendRequest{
ctx: request.ctx,
msg: request.msg,
callback: request.callback,
callbackOnce: request.callbackOnce,
publishTime: request.publishTime,
blockCh: request.blockCh,
closeBlockChOnce: request.closeBlockChOnce,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
}
// the permit of first chunk has acquiredifchunkID!=0&&!p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize-int64(rhs))
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
}
// close the blockCh when all the chunks acquired permitsrequest.stopBlock()
} else {
// close the blockCh when totalChunks is 1 (it has acquired permits)request.stopBlock()
p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
}
} else {
smm:=p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled:=!p.options.DisableMultiSchemaadded:=addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled)
if!added {
// The current batch is full. flush it and retryp.internalFlushCurrentBatch()
// after flushing try again to add the current payloadifok:=addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
}
ifrequest.flushImmediately {
p.internalFlushCurrentBatch()
}
}
}
Expected behavior
pendingQueue
of dynamic size (can be expanded at runtime) or with a bigger size thanMaxPendingMessages
;partitionProducer.canAddToQueue()
do not checkpendingQueue
any more if it can be expanded at runtime;partitionProducer.canAddToQueue()
inpartitionProducer.internalSendAsync
before putting thesendRequest
intopartitionProducer.dataChan
;sendRequest.blockCh
, no blocking any more;partitionProducer.dataChan
as pendingMessage, send and flush them when the producer is closed.Now, we won't block every time, just block when
MaxPendingMessages
is reached, and no semantics confusion any more.Actual behavior
Currently, we use a lot of measures to controll the
MaxPendingMessages
in the producer,partitionProducer.dataChan
withMaxPendingMessages
capacity,partitionProducer.publishSemaphore
,sendRequest.blockCh
,partitionProducer.canAddToQueue()
and the complicated code inpartitionProducer.internalSend()
, these make the code difficult to understand and maintain. What is worse, is that whenDisableBlockIfQueueFull
is set to false, thepartitionProducer.internalSendAsync()
will block every time, event if the queue is NOT full, which will block the application level logic.And, when chunking is enabled, a Message will be seperated into many chunks, correspondingly, many
pendingItem
s in thependingQueue
, when thependingQueue
is full, it doesn't meanMaxPendingMessages
is reached from the user‘s eye view, which is semantically confused.Steps to reproduce
Review the code
System configuration
Pulsar version: x.y
@merlimat @RobertIndie @wolfstudy @Gleiphir2769
The text was updated successfully, but these errors were encountered: