diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52781048ea..37290c7e05 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,8 +51,12 @@ jobs: go-version: [ '1.22', '1.23' ] steps: - uses: actions/checkout@v3 - - name: clean docker cache - run: docker rmi $(docker images -q) -f && df -h + - name: Check for Docker images + id: check_images + run: echo "::set-output name=images::$(docker images -q | wc -l)" + - name: Clean Docker cache if images exist + if: ${{ steps.check_images.outputs.images > 0 }} + run: docker rmi $(docker images -q) -f && df -h - uses: actions/setup-go@v3 with: go-version: ${{ matrix.go-version }} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index dd770ce788..568dcaa96f 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1629,7 +1629,10 @@ func (pc *partitionConsumer) dispatcher() { messages[0] = nil messages = messages[1:] - pc.availablePermits.inc() + // for the zeroQueueConsumer, the permits controlled by itself + if pc.options.receiverQueueSize > 0 { + pc.availablePermits.inc() + } if pc.options.autoReceiverQueueSize { pc.incomingMessages.Dec() diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index d8c8f7c9a1..34e9df9f20 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) { err = consumer.Unsubscribe() assert.Nil(t, err) } + +func TestMultipleConsumer(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + log.Fatal(err) + } + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create consumer1 + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + EnableZeroQueueConsumer: true, + }) + assert.Nil(t, err) + _, ok := consumer1.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer1.Close() + + // create consumer2 + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + EnableZeroQueueConsumer: true, + }) + assert.Nil(t, err) + _, ok = consumer2.(*zeroQueueConsumer) + assert.True(t, ok) + defer consumer2.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + sendNum := 10 + // send 10 messages + for i := 0; i < sendNum; i++ { + msg, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + log.Printf("send message: %s", msg.String()) + } + + // receive messages + for i := 0; i < sendNum/2; i++ { + msg, err := consumer1.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload()) + // ack message + consumer1.Ack(msg) + } + + // receive messages + for i := 0; i < sendNum/2; i++ { + msg, err := consumer2.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload()) + // ack message + consumer2.Ack(msg) + } + +} + func TestPartitionZeroQueueConsumer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,