Skip to content

Commit

Permalink
[fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and…
Browse files Browse the repository at this point in the history
… schema (#1055)

### Motivation

The producer memory limit have some problem when `EnableChunking=true` or `Schema` is set.
- When `Schema` is set, the actual message payload is `msg.Value`. The `len(msg.Payload)` may be 0 and memory can not be reserved acurate.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L479-L494

- In chunking, if producer meets the memory limit, it should release the memory for **chunks which has send out**. But the calculate for this release is not accurate, it should be `uncompressedPayloadSize - int64(lhs)` instead of `uncompressedPayloadSize - int64(rhs)`
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L662-L664

- In chunking, if `internalSingleSend` is failed, it should release the memory for **single chunk**. But we release all the chunks memory repeatly now.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L838-L843

- When producer received the receipt from broker, it should release the memory **it reserved before sending**. But it releases wrong size in `chunking` and `schema`.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L1221-L1230

### Modifications

- Fix all the memory limit problems relative to `chunking` and `schema`
- Add unit tests to cover these scenarios

---------

Co-authored-by: shenjiaqi.2769 <[email protected]>
  • Loading branch information
Gleiphir2769 and shenjiaqi.2769 authored Jul 20, 2023
1 parent 3812c07 commit 28f61d2
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 22 deletions.
55 changes: 33 additions & 22 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,21 +477,19 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

// read payload from message
uncompressedPayload := msg.Payload
uncompressedPayloadSize := int64(len(uncompressedPayload))

var schemaPayload []byte
var err error

// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request, uncompressedPayloadSize) {
if !p.canAddToQueue(request) {
return
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
Expand All @@ -510,7 +508,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
Expand All @@ -526,7 +523,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
Expand All @@ -537,6 +533,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

uncompressedSize := len(uncompressedPayload)

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(request, int64(uncompressedSize)) {
return
}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
Expand Down Expand Up @@ -586,7 +587,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
Expand All @@ -605,7 +606,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
Expand Down Expand Up @@ -652,10 +653,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
reservedMem: int64(rhs - lhs),
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
Expand All @@ -680,7 +682,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Expand Down Expand Up @@ -832,7 +834,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,

if err != nil {
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
p.releaseSemaphoreAndMem(request.reservedMem)
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
}
Expand Down Expand Up @@ -971,7 +973,7 @@ func (p *partitionProducer) failTimeoutMessages() {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.releaseSemaphoreAndMem(int64(size))
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
Expand Down Expand Up @@ -1208,7 +1210,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
Expand Down Expand Up @@ -1352,6 +1354,7 @@ type sendRequest struct {
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
}

// stopBlock can be invoked multiple times safety
Expand Down Expand Up @@ -1401,31 +1404,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
p.client.memLimit.ReleaseMemory(size)
}

func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool {
func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull)
return false
}
if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) {
} else {
if !p.publishSemaphore.Acquire(sr.ctx) {
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
p.metrics.MessagesPending.Inc()
return true
}

func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
if p.options.DisableBlockIfQueueFull {
if !p.client.memLimit.TryReserveMemory(size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull)
return false
}

} else {
if !p.publishSemaphore.Acquire(sr.ctx) {
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) {
if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
p.metrics.MessagesPending.Inc()
p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
sr.reservedMem += size
p.metrics.BytesPending.Add(float64(size))
return true
}

Expand Down
159 changes: 159 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,165 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.NoError(t, err)
}

func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {

c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 100 * 6,
})
assert.NoError(t, err)
defer c.Close()

schema := NewAvroSchema(`{"fields":
[
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
],
"name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, nil)

topicName := newTopicName()
producer1, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Second,
SendTimeout: 2 * time.Second,
})

producer2, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Second,
SendTimeout: 2 * time.Second,
})

// the size of encoded value is 6 bytes
value := map[string]interface{}{
"id": 0,
"name": map[string]interface{}{
"string": "abc",
},
}

n := 101
for i := 0; i < n/2; i++ {
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})

producer2.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})
}
// Last message in order to reach the limit
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

_, err = producer2.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
assert.NoError(t, err)
err = producer2.Flush()
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.NoError(t, err)
_, err = producer2.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.NoError(t, err)
}

func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {

c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 5 * 1024,
})
assert.NoError(t, err)
defer c.Close()

topicName := newTopicName()
producer1, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: true,
EnableChunking: true,
SendTimeout: 2 * time.Second,
})

producer2, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Millisecond,
SendTimeout: 2 * time.Second,
})

producer2.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 5*1024+1),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
t.Fatal(e)
}
})

time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1),
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// wait all the mem have been released
retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 0, int(c.(*client).memLimit.CurrentUsage()))
})

producer3, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: true,
EnableChunking: true,
MaxPendingMessages: 1,
ChunkMaxMessageSize: 1024,
SendTimeout: 2 * time.Second,
})

// producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk)
// because it reaches MaxPendingMessages in chunking
_, _ = producer3.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 2*1024),
})
assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
}

func TestMemLimitContextCancel(t *testing.T) {

c, err := NewClient(ClientOptions{
Expand Down

0 comments on commit 28f61d2

Please sign in to comment.