From 28f61d2f9e00279028e4c6e3558e5aa2665d25cd Mon Sep 17 00:00:00 2001 From: Jiaqi Shen <18863662628@163.com> Date: Thu, 20 Jul 2023 15:51:41 +0800 Subject: [PATCH] [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema (#1055) ### Motivation The producer memory limit have some problem when `EnableChunking=true` or `Schema` is set. - When `Schema` is set, the actual message payload is `msg.Value`. The `len(msg.Payload)` may be 0 and memory can not be reserved acurate. https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L479-L494 - In chunking, if producer meets the memory limit, it should release the memory for **chunks which has send out**. But the calculate for this release is not accurate, it should be `uncompressedPayloadSize - int64(lhs)` instead of `uncompressedPayloadSize - int64(rhs)` https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L662-L664 - In chunking, if `internalSingleSend` is failed, it should release the memory for **single chunk**. But we release all the chunks memory repeatly now. https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L838-L843 - When producer received the receipt from broker, it should release the memory **it reserved before sending**. But it releases wrong size in `chunking` and `schema`. https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L1221-L1230 ### Modifications - Fix all the memory limit problems relative to `chunking` and `schema` - Add unit tests to cover these scenarios --------- Co-authored-by: shenjiaqi.2769 --- pulsar/producer_partition.go | 55 +++++++----- pulsar/producer_test.go | 159 +++++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+), 22 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e74fd984f4..5daf54c44b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -477,21 +477,19 @@ func (p *partitionProducer) internalSend(request *sendRequest) { // read payload from message uncompressedPayload := msg.Payload - uncompressedPayloadSize := int64(len(uncompressedPayload)) var schemaPayload []byte var err error // The block chan must be closed when returned with exception defer request.stopBlock() - if !p.canAddToQueue(request, uncompressedPayloadSize) { + if !p.canAddToQueue(request) { return } if p.options.DisableMultiSchema { if msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) return @@ -510,7 +508,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if uncompressedPayload == nil && schema != nil { schemaPayload, err = schema.Encode(msg.Value) if err != nil { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error())) p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return @@ -526,7 +523,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if schemaVersion == nil { schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo()) if err != nil { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) p.log.WithError(err).Error("get schema version fail") runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err)) return @@ -537,6 +533,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { uncompressedSize := len(uncompressedPayload) + // try to reserve memory for uncompressedPayload + if !p.canReserveMem(request, int64(uncompressedSize)) { + return + } + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter) @@ -586,7 +587,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { // if msg is too large and chunking is disabled if checkSize > maxMessageSize && !p.options.EnableChunking { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) + p.releaseSemaphoreAndMem(int64(uncompressedSize)) runCallback(request.callback, nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). WithField("size", checkSize). @@ -605,7 +606,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } else { payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm) if payloadChunkSize <= 0 { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) + p.releaseSemaphoreAndMem(int64(uncompressedSize)) runCallback(request.callback, nil, msg, errMetaTooLarge) p.log.WithError(errMetaTooLarge). WithField("metadata size", proto.Size(mm)). @@ -652,10 +653,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { uuid: uuid, chunkRecorder: cr, transaction: request.transaction, + reservedMem: int64(rhs - lhs), } // the permit of first chunk has acquired - if chunkID != 0 && !p.canAddToQueue(nsr, 0) { - p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs)) + if chunkID != 0 && !p.canAddToQueue(nsr) { + p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs)) return } p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize)) @@ -680,7 +682,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { // after flushing try again to add the current payload if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion, multiSchemaEnabled); !ok { - p.releaseSemaphoreAndMem(uncompressedPayloadSize) + p.releaseSemaphoreAndMem(int64(uncompressedSize)) runCallback(request.callback, nil, request.msg, errFailAddToBatch) p.log.WithField("size", uncompressedSize). WithField("properties", msg.Properties). @@ -832,7 +834,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, if err != nil { runCallback(request.callback, nil, request.msg, err) - p.releaseSemaphoreAndMem(int64(len(msg.Payload))) + p.releaseSemaphoreAndMem(request.reservedMem) p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value) return } @@ -971,7 +973,7 @@ func (p *partitionProducer) failTimeoutMessages() { sr := i.(*sendRequest) if sr.msg != nil { size := len(sr.msg.Payload) - p.releaseSemaphoreAndMem(int64(size)) + p.releaseSemaphoreAndMem(sr.reservedMem) p.metrics.MessagesPending.Dec() p.metrics.BytesPending.Sub(float64(size)) p.metrics.PublishErrorsTimeout.Inc() @@ -1208,7 +1210,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) for idx, i := range pi.sendRequests { sr := i.(*sendRequest) atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))) + p.releaseSemaphoreAndMem(sr.reservedMem) p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) p.metrics.MessagesPublished.Inc() p.metrics.MessagesPending.Dec() @@ -1352,6 +1354,7 @@ type sendRequest struct { uuid string chunkRecorder *chunkRecorder transaction *transaction + reservedMem int64 } // stopBlock can be invoked multiple times safety @@ -1401,31 +1404,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) { p.client.memLimit.ReleaseMemory(size) } -func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool { +func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool { if p.options.DisableBlockIfQueueFull { if !p.publishSemaphore.TryAcquire() { runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull) return false } - if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) { + } else { + if !p.publishSemaphore.Acquire(sr.ctx) { + runCallback(sr.callback, nil, sr.msg, errContextExpired) + return false + } + } + p.metrics.MessagesPending.Inc() + return true +} + +func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool { + if p.options.DisableBlockIfQueueFull { + if !p.client.memLimit.TryReserveMemory(size) { p.publishSemaphore.Release() runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull) return false } } else { - if !p.publishSemaphore.Acquire(sr.ctx) { - runCallback(sr.callback, nil, sr.msg, errContextExpired) - return false - } - if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) { + if !p.client.memLimit.ReserveMemory(sr.ctx, size) { p.publishSemaphore.Release() runCallback(sr.callback, nil, sr.msg, errContextExpired) return false } } - p.metrics.MessagesPending.Inc() - p.metrics.BytesPending.Add(float64(len(sr.msg.Payload))) + sr.reservedMem += size + p.metrics.BytesPending.Add(float64(size)) return true } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index adbdc71e67..be9885fa03 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1939,6 +1939,165 @@ func TestMemLimitRejectProducerMessages(t *testing.T) { assert.NoError(t, err) } +func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MemoryLimitBytes: 100 * 6, + }) + assert.NoError(t, err) + defer c.Close() + + schema := NewAvroSchema(`{"fields": + [ + {"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]} + ], + "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, nil) + + topicName := newTopicName() + producer1, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + producer2, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Second, + SendTimeout: 2 * time.Second, + }) + + // the size of encoded value is 6 bytes + value := map[string]interface{}{ + "id": 0, + "name": map[string]interface{}{ + "string": "abc", + }, + } + + n := 101 + for i := 0; i < n/2; i++ { + producer1.SendAsync(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + + producer2.SendAsync(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + } + // Last message in order to reach the limit + producer1.SendAsync(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }, func(id MessageID, message *ProducerMessage, e error) {}) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + _, err = producer2.Send(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + // flush pending msg + err = producer1.Flush() + assert.NoError(t, err) + err = producer2.Flush() + assert.NoError(t, err) + assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }) + assert.NoError(t, err) + _, err = producer2.Send(context.Background(), &ProducerMessage{ + Value: value, + Schema: schema, + }) + assert.NoError(t, err) +} + +func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) { + + c, err := NewClient(ClientOptions{ + URL: serviceURL, + MemoryLimitBytes: 5 * 1024, + }) + assert.NoError(t, err) + defer c.Close() + + topicName := newTopicName() + producer1, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: true, + EnableChunking: true, + SendTimeout: 2 * time.Second, + }) + + producer2, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: false, + BatchingMaxPublishDelay: 100 * time.Millisecond, + SendTimeout: 2 * time.Second, + }) + + producer2.SendAsync(context.Background(), &ProducerMessage{ + Payload: make([]byte, 5*1024+1), + }, func(id MessageID, message *ProducerMessage, e error) { + if e != nil { + t.Fatal(e) + } + }) + + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage()) + + _, err = producer1.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1), + }) + assert.Error(t, err) + assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull)) + + // wait all the mem have been released + retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, 0, int(c.(*client).memLimit.CurrentUsage())) + }) + + producer3, _ := c.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBlockIfQueueFull: true, + DisableBatching: true, + EnableChunking: true, + MaxPendingMessages: 1, + ChunkMaxMessageSize: 1024, + SendTimeout: 2 * time.Second, + }) + + // producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk) + // because it reaches MaxPendingMessages in chunking + _, _ = producer3.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 2*1024), + }) + assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage()) +} + func TestMemLimitContextCancel(t *testing.T) { c, err := NewClient(ClientOptions{