Skip to content

Commit

Permalink
Merge branch 'master' into pip-254
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Dec 13, 2024
2 parents aea803c + edea3eb commit a4907cd
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ jobs:
go-version: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3
- name: clean docker cache
run: docker rmi $(docker images -q) -f && df -h
- name: Check for Docker images
id: check_images
run: echo "::set-output name=images::$(docker images -q | wc -l)"
- name: Clean Docker cache if images exist
if: ${{ steps.check_images.outputs.images > 0 }}
run: docker rmi $(docker images -q) -f && df -h
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
Expand Down
5 changes: 4 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,10 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]

pc.availablePermits.inc()
// for the zeroQueueConsumer, the permits controlled by itself
if pc.options.receiverQueueSize > 0 {
pc.availablePermits.inc()
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
Expand Down
83 changes: 83 additions & 0 deletions pulsar/consumer_zero_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,89 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
err = consumer.Unsubscribe()
assert.Nil(t, err)
}

func TestMultipleConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create consumer1
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
EnableZeroQueueConsumer: true,
})
assert.Nil(t, err)
_, ok := consumer1.(*zeroQueueConsumer)
assert.True(t, ok)
defer consumer1.Close()

// create consumer2
consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
EnableZeroQueueConsumer: true,
})
assert.Nil(t, err)
_, ok = consumer2.(*zeroQueueConsumer)
assert.True(t, ok)
defer consumer2.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

sendNum := 10
// send 10 messages
for i := 0; i < sendNum; i++ {
msg, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.Nil(t, err)
log.Printf("send message: %s", msg.String())
}

// receive messages
for i := 0; i < sendNum/2; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload())
// ack message
consumer1.Ack(msg)
}

// receive messages
for i := 0; i < sendNum/2; i++ {
msg, err := consumer2.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload())
// ack message
consumer2.Ack(msg)
}

}

func TestPartitionZeroQueueConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down

0 comments on commit a4907cd

Please sign in to comment.