diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 11d5f652e4..ee2a9ba271 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -478,231 +478,88 @@ func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg cb(id, msg, err) } -func (p *partitionProducer) internalSend(request *sendRequest) { - p.log.Debug("Received send request: ", *request.msg) +func (p *partitionProducer) internalSend(sr *sendRequest) { + p.log.Debug("Received send sr: ", *sr.msg) - msg := request.msg - - // read payload from message - uncompressedPayload := msg.Payload - - var schemaPayload []byte - var err error - - // The block chan must be closed when returned with exception - defer request.stopBlock() - if !p.canAddToQueue(request) { - return - } + if sr.sendAsBatch { + smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize)) + multiSchemaEnabled := !p.options.DisableMultiSchema + added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, + multiSchemaEnabled) + if !added { + // The current batch is full. flush it and retry + p.internalFlushCurrentBatch() - if p.options.DisableMultiSchema { - if msg.Schema != nil && p.options.Schema != nil && - msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { - runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) - p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) - return - } - } - var schema Schema - var schemaVersion []byte - if msg.Schema != nil { - schema = msg.Schema - } else if p.options.Schema != nil { - schema = p.options.Schema - } - if msg.Value != nil { - // payload and schema are mutually exclusive - // try to get payload from schema value only if payload is not set - if uncompressedPayload == nil && schema != nil { - schemaPayload, err = schema.Encode(msg.Value) - if err != nil { - runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) - p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) + // after flushing try again to add the current payload + ok := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled) + if !ok { + p.log.WithField("size", sr.uncompressedSize). + WithField("properties", sr.msg.Properties). + Error("unable to add message to batch") + sr.done(nil, errFailAddToBatch) return } } - } - if uncompressedPayload == nil { - uncompressedPayload = schemaPayload - } - if schema != nil { - schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) - if schemaVersion == nil { - schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) - if err != nil { - p.log.WithError(err).Error("get schema version fail") - runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) - return - } - p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + if sr.flushImmediately { + p.internalFlushCurrentBatch() } - } - uncompressedSize := len(uncompressedPayload) - - // try to reserve memory for uncompressedPayload - if !p.canReserveMem(request, int64(uncompressedSize)) { return } - deliverAt := msg.DeliverAt - if msg.DeliverAfter.Nanoseconds() > 0 { - deliverAt = time.Now().Add(msg.DeliverAfter) - } - - mm := p.genMetadata(msg, uncompressedSize, deliverAt) - - // set default ReplicationClusters when DisableReplication - if msg.DisableReplication { - msg.ReplicationClusters = []string{"__local__"} - } - - sendAsBatch := !p.options.DisableBatching && - msg.ReplicationClusters == nil && - deliverAt.UnixNano() < 0 - - // Once the batching is enabled, it can close blockCh early to make block finish - if sendAsBatch { - request.stopBlock() - } else { - // update sequence id for metadata, make the size of msgMetadata more accurate - // batch sending will update sequence ID in the BatchBuilder - p.updateMetadataSeqID(mm, msg) - } - - maxMessageSize := int(p._getConn().GetMaxMessageSize()) - - // compress payload if not batching - var compressedPayload []byte - var compressedSize int - var checkSize int - if !sendAsBatch { - compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload) - compressedSize = len(compressedPayload) - checkSize = compressedSize - - // set the compress type in msgMetaData - compressionType := pb.CompressionType(p.options.CompressionType) - if compressionType != pb.CompressionType_NONE { - mm.Compression = &compressionType - } - } else { - // final check for batching message is in serializeMessage - // this is a double check - checkSize = uncompressedSize - } - - // if msg is too large and chunking is disabled - if checkSize > maxMessageSize && !p.options.EnableChunking { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errMessageTooLarge) - p.log.WithError(errMessageTooLarge). - WithField("size", checkSize). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", maxMessageSize) - p.metrics.PublishErrorsMsgTooLarge.Inc() + if sr.totalChunks <= 1 { + p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize)) return } - var totalChunks int - // max chunk payload size - var payloadChunkSize int - if sendAsBatch || !p.options.EnableChunking { - totalChunks = 1 - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - } else { - payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) - if payloadChunkSize <= 0 { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, msg, errMetaTooLarge) - p.log.WithError(errMetaTooLarge). - WithField("metadata size", proto.Size(mm)). - WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) - p.metrics.PublishErrorsMsgTooLarge.Inc() - return - } - // set ChunkMaxMessageSize - if p.options.ChunkMaxMessageSize != 0 { - payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) - } - totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize)))) - } - - // set total chunks to send request - request.totalChunks = totalChunks - - if !sendAsBatch { - if totalChunks > 1 { - var lhs, rhs int - uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10)) - mm.Uuid = proto.String(uuid) - mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) - mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize)) - cr := newChunkRecorder() - for chunkID := 0; chunkID < totalChunks; chunkID++ { - lhs = chunkID * payloadChunkSize - if rhs = lhs + payloadChunkSize; rhs > compressedSize { - rhs = compressedSize - } - // update chunk id - mm.ChunkId = proto.Int32(int32(chunkID)) - nsr := &sendRequest{ - ctx: request.ctx, - msg: request.msg, - callback: request.callback, - callbackOnce: request.callbackOnce, - publishTime: request.publishTime, - blockCh: request.blockCh, - closeBlockChOnce: request.closeBlockChOnce, - totalChunks: totalChunks, - chunkID: chunkID, - uuid: uuid, - chunkRecorder: cr, - transaction: request.transaction, - reservedMem: int64(rhs - lhs), - } - // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr) { - p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs)) - return - } - p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize)) - } - // close the blockCh when all the chunks acquired permits - request.stopBlock() - } else { - // close the blockCh when totalChunks is 1 (it has acquired permits) - request.stopBlock() - p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize)) - } - } else { - smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize) - multiSchemaEnabled := !p.options.DisableMultiSchema - added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled) - if !added { - // The current batch is full. flush it and retry + var lhs, rhs int + uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10)) + sr.mm.Uuid = proto.String(uuid) + sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks)) + sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize)) + cr := newChunkRecorder() - p.internalFlushCurrentBatch() - - // after flushing try again to add the current payload - if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, - multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(int64(uncompressedSize)) - runCallback(request.callback, nil, request.msg, errFailAddToBatch) - p.log.WithField("size", uncompressedSize). - WithField("properties", msg.Properties). - Error("unable to add message to batch") - return - } + for chunkID := 0; chunkID < sr.totalChunks; chunkID++ { + lhs = chunkID * sr.payloadChunkSize + rhs = lhs + sr.payloadChunkSize + if rhs > sr.compressedSize { + rhs = sr.compressedSize } - if request.flushImmediately { - - p.internalFlushCurrentBatch() + // update chunk id + sr.mm.ChunkId = proto.Int32(int32(chunkID)) + nsr := &sendRequest{ + producer: sr.producer, + ctx: sr.ctx, + msg: sr.msg, + callback: sr.callback, + callbackOnce: sr.callbackOnce, + publishTime: sr.publishTime, + flushImmediately: sr.flushImmediately, + totalChunks: sr.totalChunks, + chunkID: chunkID, + uuid: uuid, + chunkRecorder: cr, + transaction: sr.transaction, + memLimit: sr.memLimit, + semaphore: sr.semaphore, + reservedSemaphore: 1, + reservedMem: int64(rhs - lhs), + sendAsBatch: sr.sendAsBatch, + schema: sr.schema, + schemaVersion: sr.schemaVersion, + uncompressedPayload: sr.uncompressedPayload, + uncompressedSize: sr.uncompressedSize, + compressedPayload: sr.compressedPayload, + compressedSize: sr.compressedSize, + payloadChunkSize: sr.payloadChunkSize, + mm: sr.mm, + deliverAt: sr.deliverAt, + maxMessageSize: sr.maxMessageSize, } + + p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, uint32(sr.maxMessageSize)) } } @@ -841,8 +698,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, ) if err != nil { - runCallback(request.callback, nil, request.msg, err) - p.releaseSemaphoreAndMem(request.reservedMem) + request.done(nil, err) p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value) return } @@ -882,13 +738,12 @@ func (p *partitionProducer) internalFlushCurrentBatch() { if err != nil { for _, cb := range callbacks { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, err) + sr.done(nil, err) } } if errors.Is(err, internal.ErrExceedMaxMessageSize) { p.log.WithError(errMessageTooLarge). Errorf("internal err: %s", err) - p.metrics.PublishErrorsMsgTooLarge.Inc() return } return @@ -979,25 +834,7 @@ func (p *partitionProducer) failTimeoutMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - if sr.msg != nil { - size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.MessagesPending.Dec() - p.metrics.BytesPending.Sub(float64(size)) - p.metrics.PublishErrorsTimeout.Inc() - p.log.WithError(errSendTimeout). - WithField("size", size). - WithField("properties", sr.msg.Properties) - } - - if sr.callback != nil { - sr.callbackOnce.Do(func() { - runCallback(sr.callback, nil, sr.msg, errSendTimeout) - }) - } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + sr.done(nil, errSendTimeout) } // flag the sending has completed with error, flush make no effect @@ -1025,13 +862,13 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if errs[i] != nil { for _, cb := range callbacks[i] { if sr, ok := cb.(*sendRequest); ok { - runCallback(sr.callback, nil, sr.msg, errs[i]) + sr.done(nil, errs[i]) } } if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { p.log.WithError(errMessageTooLarge). Errorf("internal err: %s", errs[i]) - p.metrics.PublishErrorsMsgTooLarge.Inc() + return } continue @@ -1119,78 +956,301 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, p.internalSendAsync(ctx, msg, callback, false) } -func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, - callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { +func (p *partitionProducer) validateMsg(msg *ProducerMessage) error { if msg == nil { - p.log.Error("Message is nil") - runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil")) - return + return newError(InvalidMessage, "Message is nil") } if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) - return + return newError(InvalidMessage, "Can not set Value and Payload both") } - // Register transaction operation to transaction and the transaction coordinator. - var newCallback func(MessageID, *ProducerMessage, error) - var txn *transaction - if msg.Transaction != nil { - transactionImpl := (msg.Transaction).(*transaction) - txn = transactionImpl - if transactionImpl.state != TxnOpen { - p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + - " by a non-open transaction.") - runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction.")) - return + if p.options.DisableMultiSchema { + if msg.Schema != nil && p.options.Schema != nil && + msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { + p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) + return fmt.Errorf("msg schema can not match with producer schema") } + } - if err := transactionImpl.registerProducerTopic(p.topic); err != nil { - runCallback(callback, nil, msg, err) - return + return nil +} + +func (p *partitionProducer) updateSchema(sr *sendRequest) error { + var schema Schema + var schemaVersion []byte + var err error + + if sr.msg.Schema != nil { + schema = sr.msg.Schema + } else if p.options.Schema != nil { + schema = p.options.Schema + } + + if schema == nil { + return nil + } + + schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) + if schemaVersion == nil { + schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) + if err != nil { + return fmt.Errorf("get schema version fail, err: %w", err) } - if err := transactionImpl.registerSendOrAckOp(); err != nil { - runCallback(callback, nil, msg, err) - return + p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion) + } + + sr.schema = schema + sr.schemaVersion = schemaVersion + return nil +} + +func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error { + // read payload from message + sr.uncompressedPayload = sr.msg.Payload + + if sr.msg.Value != nil { + if sr.schema == nil { + p.log.Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, "set schema value without setting schema") } - newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) { - runCallback(callback, id, producerMessage, err) - transactionImpl.endSendOrAckOp(err) + + // payload and schema are mutually exclusive + // try to get payload from schema value only if payload is not set + schemaPayload, err := sr.schema.Encode(sr.msg.Value) + if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value) + return newError(SchemaFailure, err.Error()) } + + sr.uncompressedPayload = schemaPayload + } + + sr.uncompressedSize = int64(len(sr.uncompressedPayload)) + return nil +} + +func (p *partitionProducer) updateMetaData(sr *sendRequest) { + deliverAt := sr.msg.DeliverAt + if sr.msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(sr.msg.DeliverAfter) + } + + sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt) + + // set default ReplicationClusters when DisableReplication + if sr.msg.DisableReplication { + sr.msg.ReplicationClusters = []string{"__local__"} + } + + sr.sendAsBatch = !p.options.DisableBatching && + sr.msg.ReplicationClusters == nil && + deliverAt.UnixNano() < 0 + + if !sr.sendAsBatch { + // update sequence id for metadata, make the size of msgMetadata more accurate + // batch sending will update sequence ID in the BatchBuilder + p.updateMetadataSeqID(sr.mm, sr.msg) + } + + sr.deliverAt = deliverAt +} + +func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error { + checkSize := sr.uncompressedSize + if !sr.sendAsBatch { + sr.compressedPayload = p.compressionProvider.Compress(nil, sr.uncompressedPayload) + sr.compressedSize = len(sr.compressedPayload) + + // set the compress type in msgMetaData + compressionType := pb.CompressionType(p.options.CompressionType) + if compressionType != pb.CompressionType_NONE { + sr.mm.Compression = &compressionType + } + + checkSize = int64(sr.compressedSize) + } + + sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) + + // if msg is too large and chunking is disabled + if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking { + p.log.WithError(errMessageTooLarge). + WithField("size", checkSize). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMessageTooLarge + } + + if sr.sendAsBatch || !p.options.EnableChunking { + sr.totalChunks = 1 + sr.payloadChunkSize = int(sr.maxMessageSize) + return nil + } + + sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm) + if sr.payloadChunkSize <= 0 { + p.log.WithError(errMetaTooLarge). + WithField("metadata size", proto.Size(sr.mm)). + WithField("properties", sr.msg.Properties). + Errorf("MaxMessageSize %d", sr.maxMessageSize) + + return errMetaTooLarge + } + + // set ChunkMaxMessageSize + if p.options.ChunkMaxMessageSize != 0 { + sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize))) + } + + sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize)))) + return nil +} + +func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { + if sr.msg.Transaction == nil { + return nil + } + + txn := (sr.msg.Transaction).(*transaction) + if txn.state != TxnOpen { + p.log.WithField("state", txn.state).Error("Failed to send message" + + " by a non-open transaction.") + return newError(InvalidStatus, "Failed to send message by a non-open transaction.") + } + + if err := txn.registerProducerTopic(p.topic); err != nil { + return err + } + + if err := txn.registerSendOrAckOp(); err != nil { + return err + } + + sr.transaction = txn + return nil +} + +func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error { + for i := 0; i < sr.totalChunks; i++ { + if p.options.DisableBlockIfQueueFull { + if !p.publishSemaphore.TryAcquire() { + return errSendQueueIsFull + } + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + return errContextExpired + } + } + } + + p.metrics.MessagesPending.Add(float64(sr.totalChunks)) + sr.semaphore = p.publishSemaphore + sr.reservedSemaphore = sr.totalChunks + return nil +} + +func (p *partitionProducer) reserveMem(sr *sendRequest) error { + requiredMem := sr.uncompressedSize + if !sr.sendAsBatch { + requiredMem = int64(sr.compressedSize) + } + + if p.options.DisableBlockIfQueueFull { + if !p.client.memLimit.TryReserveMemory(requiredMem) { + return errMemoryBufferIsFull + } + } else { - newCallback = callback + if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) { + return errContextExpired + } } + + sr.memLimit = p.client.memLimit + sr.reservedMem += requiredMem + p.metrics.BytesPending.Add(float64(requiredMem)) + return nil +} + +func (p *partitionProducer) reserveResources(sr *sendRequest) error { + if err := p.reserveSemaphore(sr); err != nil { + return err + } + if err := p.reserveMem(sr); err != nil { + return err + } + return nil +} + +func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, + callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + err := p.validateMsg(msg) + if err != nil { + p.log.Error(err) + runCallback(callback, nil, msg, err) + return + } + if p.getProducerState() != producerReady { // Producer is closing - runCallback(newCallback, nil, msg, errProducerClosed) + runCallback(callback, nil, msg, errProducerClosed) return } - // bc only works when DisableBlockIfQueueFull is false - bc := make(chan struct{}) + // run interceptors before encoding/compressing + p.options.Interceptors.BeforeSend(p, msg) - // callbackOnce make sure the callback is only invoked once in chunking - callbackOnce := &sync.Once{} sr := &sendRequest{ + producer: p, ctx: ctx, msg: msg, - callback: newCallback, - callbackOnce: callbackOnce, - flushImmediately: flushImmediately, + callback: callback, + callbackOnce: &sync.Once{}, publishTime: time.Now(), - blockCh: bc, - closeBlockChOnce: &sync.Once{}, - transaction: txn, + flushImmediately: flushImmediately, } - p.options.Interceptors.BeforeSend(p, msg) - p.dataChan <- sr + err = p.updateSchema(sr) + if err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + + err = p.updateUncompressPayload(sr) + if err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + + p.updateMetaData(sr) + + err = p.updateChunkInfo(sr) + if err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } - if !p.options.DisableBlockIfQueueFull { - // block if queue full - <-bc + err = p.prepareTransaction(sr) + if err != nil { + p.log.Error(err) + sr.done(nil, err) + return + } + + // everything is OK, reserve required semaphore and memory + err = p.reserveResources(sr) + if err != nil { + p.log.Error(err) + sr.done(nil, err) + return } + + p.dataChan <- sr } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1227,55 +1287,40 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(sr.reservedMem) - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - - if sr.callback != nil || len(p.options.Interceptors) > 0 { - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - batchSize, - ) - - if sr.totalChunks > 1 { - if sr.chunkID == 0 { - sr.chunkRecorder.setFirstChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - } else if sr.chunkID == sr.totalChunks-1 { - sr.chunkRecorder.setLastChunkID( - &messageID{ - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - -1, - p.partitionIdx, - 0, - }) - // use chunkMsgID to set msgID - msgID = &sr.chunkRecorder.chunkedMsgID - } - } - if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { - runCallback(sr.callback, msgID, sr.msg, nil) - p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + batchSize, + ) + + if sr.totalChunks > 1 { + if sr.chunkID == 0 { + sr.chunkRecorder.setFirstChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + } else if sr.chunkID == sr.totalChunks-1 { + sr.chunkRecorder.setLastChunkID( + &messageID{ + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + -1, + p.partitionIdx, + 0, + }) + // use chunkMsgID to set msgID + msgID = &sr.chunkRecorder.chunkedMsgID } } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } + + sr.done(msgID, nil) } // Mark this pending item as done @@ -1358,27 +1403,78 @@ func (p *partitionProducer) Close() { } type sendRequest struct { - ctx context.Context - msg *ProducerMessage - callback func(MessageID, *ProducerMessage, error) - callbackOnce *sync.Once - publishTime time.Time - flushImmediately bool - blockCh chan struct{} - closeBlockChOnce *sync.Once - totalChunks int - chunkID int - uuid string - chunkRecorder *chunkRecorder - transaction *transaction - reservedMem int64 + producer *partitionProducer + ctx context.Context + msg *ProducerMessage + callback func(MessageID, *ProducerMessage, error) + callbackOnce *sync.Once + publishTime time.Time + flushImmediately bool + totalChunks int + chunkID int + uuid string + chunkRecorder *chunkRecorder + transaction *transaction + memLimit internal.MemoryLimitController + reservedMem int64 + semaphore internal.Semaphore + reservedSemaphore int + sendAsBatch bool + schema Schema + schemaVersion []byte + uncompressedPayload []byte + uncompressedSize int64 + compressedPayload []byte + compressedSize int + payloadChunkSize int + mm *pb.MessageMetadata + deliverAt time.Time + maxMessageSize int32 } -// stopBlock can be invoked multiple times safety -func (sr *sendRequest) stopBlock() { - sr.closeBlockChOnce.Do(func() { - close(sr.blockCh) - }) +func (sr *sendRequest) done(msgID MessageID, err error) { + if err == nil { + sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9) + sr.producer.metrics.MessagesPublished.Inc() + sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem)) + } + + if err == errSendTimeout { + sr.producer.metrics.PublishErrorsTimeout.Inc() + sr.producer.log.WithError(err). + WithField("size", sr.reservedMem). + WithField("properties", sr.msg.Properties) + } + + if err == errMessageTooLarge { + sr.producer.metrics.PublishErrorsMsgTooLarge.Inc() + } + + if sr.semaphore != nil { + for i := 0; i < sr.reservedSemaphore; i++ { + sr.semaphore.Release() + } + sr.producer.metrics.MessagesPending.Dec() + } + + if sr.memLimit != nil { + sr.memLimit.ReleaseMemory(sr.reservedMem) + sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem)) + } + + if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, msgID, sr.msg, err) + }) + + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(err) + } + + if sr.producer.options.Interceptors != nil { + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + } + } } type closeProducer struct {