// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "errors" "fmt" "math/rand" "net/http" "strings" "sync" "testing" "time" "github.com/apache/pulsar-client-go/pulsar/internal" "google.golang.org/protobuf/proto" "github.com/stretchr/testify/assert" ) var _brokerMaxMessageSize = 1024 * 1024 func TestInvalidChunkingConfig(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: newTopicName(), DisableBatching: false, EnableChunking: true, }) assert.Error(t, err, "producer creation should have fail") assert.Nil(t, producer) } func TestLargeMessage(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() // create producer without ChunkMaxMessageSize producer1, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: true, EnableChunking: true, }) assert.NoError(t, err) assert.NotNil(t, producer1) defer producer1.Close() // create producer with ChunkMaxMessageSize producer2, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: true, EnableChunking: true, ChunkMaxMessageSize: 5, }) assert.NoError(t, err) assert.NotNil(t, producer2) defer producer2.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, Type: Exclusive, SubscriptionName: "chunk-subscriber", }) assert.NoError(t, err) assert.NotNil(t, consumer) defer consumer.Close() expectMsgs := make([][]byte, 0, 10) // test send chunk with serverMaxMessageSize limit for i := 0; i < 5; i++ { msg := createTestMessagePayload(_brokerMaxMessageSize + 1) expectMsgs = append(expectMsgs, msg) ID, err := producer1.Send(context.Background(), &ProducerMessage{ Payload: msg, }) assert.NoError(t, err) assert.NotNil(t, ID) } // test receive chunk with serverMaxMessageSize limit for i := 0; i < 5; i++ { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) msg, err := consumer.Receive(ctx) cancel() assert.NoError(t, err) expectMsg := expectMsgs[i] assert.Equal(t, expectMsg, msg.Payload()) // ack message err = consumer.Ack(msg) assert.NoError(t, err) } // test send chunk with ChunkMaxMessageSize limit for i := 0; i < 5; i++ { msg := createTestMessagePayload(50) expectMsgs = append(expectMsgs, msg) ID, err := producer2.Send(context.Background(), &ProducerMessage{ Payload: msg, }) assert.NoError(t, err) assert.NotNil(t, ID) } // test receive chunk with ChunkMaxMessageSize limit for i := 5; i < 10; i++ { msg, err := consumer.Receive(context.Background()) assert.NoError(t, err) expectMsg := expectMsgs[i] assert.Equal(t, expectMsg, msg.Payload()) // ack message err = consumer.Ack(msg) assert.NoError(t, err) } } func TestMaxPendingChunkMessages(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: true, EnableChunking: true, ChunkMaxMessageSize: 10, }) assert.NoError(t, err) assert.NotNil(t, producer) c, err := client.Subscribe(ConsumerOptions{ Topic: topic, Type: Exclusive, SubscriptionName: "chunk-subscriber", MaxPendingChunkedMessage: 1, }) assert.NoError(t, err) assert.NotNil(t, c) defer c.Close() pc := c.(*consumer).consumers[0] callbackOnce0 := &sync.Once{} cr0 := newChunkRecorder() msg0 := "chunk-0-0|chunk-0-1|" callbackOnce1 := &sync.Once{} cr1 := newChunkRecorder() msg1 := "chunk-1-0|chunk-1-1|" sendSingleChunk(producer, "0", 0, 2, msg0, callbackOnce0, cr0) // MaxPendingChunkedMessage is 1, the chunked message with uuid 0 will be discarded sendSingleChunk(producer, "1", 0, 2, msg1, callbackOnce1, cr1) // chunkedMsgCtx with uuid 0 should be discarded retryAssert(t, 3, 200, func() {}, func(t assert.TestingT) bool { pc.chunkedMsgCtxMap.mu.Lock() defer pc.chunkedMsgCtxMap.mu.Unlock() return assert.Equal(t, 1, len(pc.chunkedMsgCtxMap.chunkedMsgCtxs)) }) sendSingleChunk(producer, "1", 1, 2, msg1, callbackOnce1, cr1) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) msg, err := c.Receive(ctx) cancel() assert.NoError(t, err) assert.Equal(t, "chunk-1-0|chunk-1-1|", string(msg.Payload())) // Ensure that the chunked message of uuid 0 is discarded. sendSingleChunk(producer, "0", 1, 2, msg0, callbackOnce0, cr0) ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) msg, err = c.Receive(ctx) cancel() assert.True(t, errors.Is(err, context.DeadlineExceeded)) } func TestExpireIncompleteChunks(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() c, err := client.Subscribe(ConsumerOptions{ Topic: topic, Type: Exclusive, SubscriptionName: "chunk-subscriber", ExpireTimeOfIncompleteChunk: time.Millisecond * 300, }) assert.NoError(t, err) defer c.Close() uuid := "test-uuid" chunkCtxMap := c.(*consumer).consumers[0].chunkedMsgCtxMap chunkCtxMap.addIfAbsent(uuid, 2, 100) ctx := chunkCtxMap.get(uuid) assert.NotNil(t, ctx) time.Sleep(400 * time.Millisecond) ctx = chunkCtxMap.get(uuid) assert.Nil(t, ctx) } func TestChunksEnqueueFailed(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, EnableChunking: true, DisableBatching: true, MaxPendingMessages: 10, ChunkMaxMessageSize: 50, DisableBlockIfQueueFull: true, }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() // Reduce publish rate to prevent the producer sending messages too fast url := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/publishRate" makeHTTPCall(t, http.MethodPost, url, "{\"publishThrottlingRateInMsg\": 1,\"publishThrottlingRateInByte\": 1000}") // Need to wait some time to let the rate limiter take effect time.Sleep(2 * time.Second) ID, err := producer.Send(context.Background(), &ProducerMessage{ Payload: createTestMessagePayload(1000), }) assert.Error(t, err) assert.Nil(t, ID) } func TestSeekChunkMessages(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() totalMessages := 5 producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, EnableChunking: true, DisableBatching: true, ChunkMaxMessageSize: 50, }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, Type: Exclusive, SubscriptionName: "default-seek", }) assert.NoError(t, err) assert.NotNil(t, consumer) defer consumer.Close() msgIDs := make([]MessageID, 0) for i := 0; i < totalMessages; i++ { ID, err := producer.Send(context.Background(), &ProducerMessage{ Payload: createTestMessagePayload(100), }) assert.NoError(t, err) msgIDs = append(msgIDs, ID) } for i := 0; i < totalMessages; i++ { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) msg, err := consumer.Receive(ctx) cancel() assert.NoError(t, err) assert.NotNil(t, msg) assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize()) } err = consumer.Seek(msgIDs[1]) assert.NoError(t, err) for i := 1; i < totalMessages; i++ { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) msg, err := consumer.Receive(ctx) cancel() assert.NoError(t, err) assert.NotNil(t, msg) assert.Equal(t, msgIDs[i].Serialize(), msg.ID().Serialize()) } // todo: add reader seek test when support reader read chunk message } func TestChunkAckAndNAck(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, EnableChunking: true, DisableBatching: true, ChunkMaxMessageSize: 50, }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, Type: Exclusive, SubscriptionName: "default-seek", NackRedeliveryDelay: time.Second, }) assert.NoError(t, err) assert.NotNil(t, consumer) defer consumer.Close() content := createTestMessagePayload(100) _, err = producer.Send(context.Background(), &ProducerMessage{ Payload: content, }) assert.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) msg, err := consumer.Receive(ctx) cancel() assert.NoError(t, err) assert.NotNil(t, msg) assert.Equal(t, msg.Payload(), content) consumer.Nack(msg) time.Sleep(time.Second * 2) ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) msg, err = consumer.Receive(ctx) cancel() assert.NoError(t, err) assert.NotNil(t, msg) assert.Equal(t, msg.Payload(), content) } func TestChunkSize(t *testing.T) { rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) assert.Nil(t, err) defer client.Close() // the default message metadata size for string schema // The proto messageMetaData size as following. (all with tag) (maxMessageSize = 1024 * 1024) // | producerName | sequenceID | publishTime | uncompressedSize | // | ------------ | ---------- | ----------- | ---------------- | // | 6 | 2 | 7 | 4 | payloadChunkSize := _brokerMaxMessageSize - 19 topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Name: "test", Topic: topic, EnableChunking: true, DisableBatching: true, }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() for size := payloadChunkSize; size <= _brokerMaxMessageSize; size++ { msgID, err := producer.Send(context.Background(), &ProducerMessage{ Payload: createTestMessagePayload(size), }) assert.NoError(t, err) if size <= payloadChunkSize { _, ok := msgID.(*messageID) assert.Equal(t, true, ok) } else { _, ok := msgID.(*chunkMessageID) assert.Equal(t, true, ok) } } } func TestChunkMultiTopicConsumerReceive(t *testing.T) { topic1 := newTopicName() topic2 := newTopicName() client, err := NewClient(ClientOptions{ URL: lookupURL, }) if err != nil { t.Fatal(err) } topics := []string{topic1, topic2} consumer, err := client.Subscribe(ConsumerOptions{ Topics: topics, SubscriptionName: "multi-topic-sub", }) if err != nil { t.Fatal(err) } defer consumer.Close() maxSize := 50 // produce messages for i, topic := range topics { p, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: true, EnableChunking: true, ChunkMaxMessageSize: uint(maxSize), }) if err != nil { t.Fatal(err) } err = genMessages(p, 10, func(idx int) string { return fmt.Sprintf("topic-%d-hello-%d-%s", i+1, idx, string(createTestMessagePayload(100))) }) p.Close() if err != nil { t.Fatal(err) } } receivedTopic1 := 0 receivedTopic2 := 0 // nolint for receivedTopic1+receivedTopic2 < 20 { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) select { case cm, ok := <-consumer.Chan(): if ok { msg := string(cm.Payload()) if strings.HasPrefix(msg, "topic-1") { receivedTopic1++ } else if strings.HasPrefix(msg, "topic-2") { receivedTopic2++ } consumer.Ack(cm.Message) } else { t.Fail() } case <-ctx.Done(): t.Error(ctx.Err()) } cancel() } assert.Equal(t, receivedTopic1, receivedTopic2) } func TestChunkBlockIfQueueFull(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) if err != nil { t.Fatal(err) } topic := newTopicName() producer, err := client.CreateProducer(ProducerOptions{ Name: "test", Topic: topic, EnableChunking: true, DisableBatching: true, MaxPendingMessages: 1, ChunkMaxMessageSize: 10, }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() // Large messages will be split into 11 chunks, exceeding the length of pending queue ID, err := producer.Send(context.Background(), &ProducerMessage{ Payload: createTestMessagePayload(100), }) assert.NoError(t, err) assert.NotNil(t, ID) } func createTestMessagePayload(size int) []byte { payload := make([]byte, size) for i := range payload { payload[i] = byte(rand.Intn(100)) } return payload } //nolint:all func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int, wholePayload string, callbackOnce *sync.Once, cr *chunkRecorder) { msg := &ProducerMessage{ Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)), } producerImpl := p.(*producer).producers[0].(*partitionProducer) mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now()) mm.Uuid = proto.String(uuid) mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) producerImpl.updateMetadataSeqID(mm, msg) producerImpl.internalSingleSend( mm, msg.Payload, &sendRequest{ producer: producerImpl, ctx: context.Background(), msg: msg, callback: func(id MessageID, producerMessage *ProducerMessage, err error) { }, callbackOnce: callbackOnce, flushImmediately: true, totalChunks: totalChunks, chunkID: chunkID, uuid: uuid, chunkRecorder: cr, transaction: nil, memLimit: nil, reservedMem: 0, semaphore: nil, reservedSemaphore: 0, sendAsBatch: false, schema: nil, schemaVersion: nil, uncompressedPayload: []byte(wholePayload), uncompressedSize: int64(len(wholePayload)), compressedPayload: []byte(wholePayload), compressedSize: len(wholePayload), payloadChunkSize: internal.MaxMessageSize - proto.Size(mm), mm: mm, deliverAt: time.Now(), maxMessageSize: internal.MaxMessageSize, }, uint32(internal.MaxMessageSize), ) }