Skip to content

Commit

Permalink
[fix] peek message will return -1 for partitionIndex (#1267)
Browse files Browse the repository at this point in the history
### Motivation
If peek a partitioned topic, will see a message id: `7316:0:-1:-1`, the parititonIndex should not be -1.

```
pulsarctl subscription peek --count 10 persistent://public/default/my-topic-partition-0 test-sub
Message ID : 7316:0:-1:-1
Properties :
{
    "publish-time": "2024-08-08T17:50:39.476+08:00"
}
Message :
00000000  68 65 6c 6c 6f 2d 31                              |hello-1|
```

### Modifications
- Set partition index on peek message.
  • Loading branch information
shibd authored Aug 13, 2024
1 parent 6a2e461 commit ab042ae
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pulsaradmin/pkg/admin/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ const (
)

func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {

msgID := resp.Header.Get("X-Pulsar-Message-ID")
ID, err := utils.ParseMessageID(msgID)
ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex())
if err != nil {
return nil, err
}
Expand Down
54 changes: 54 additions & 0 deletions pulsaradmin/pkg/admin/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) {

}

func TestPeekMessageForPartitionedTopic(t *testing.T) {
ctx := context.Background()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

err = admin.Topics().Create(*topicName, 2)
assert.NoError(t, err)

err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest)
assert.NoError(t, err)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()

for i := 0; i < 100; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, nil)
}
err = producer.Flush()
if err != nil {
return
}

for i := 0; i < 2; i++ {
topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i)
topicName, err := utils.GetTopicName(topicWithPartition)
assert.NoError(t, err)
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 10)
assert.NoError(t, err)
assert.NotNil(t, messages)
for _, msg := range messages {
assert.Equal(t, msg.GetMessageID().PartitionIndex, i)
}
}
}

func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
Expand Down
9 changes: 9 additions & 0 deletions pulsaradmin/pkg/utils/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ type MessageID struct {
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var Earliest = MessageID{-1, -1, -1, -1}

func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID, error) {
id, err := ParseMessageID(str)
if err != nil {
return nil, err
}
id.PartitionIndex = index
return id, nil
}

func ParseMessageID(str string) (*MessageID, error) {
s := strings.Split(str, ":")

Expand Down
4 changes: 4 additions & 0 deletions pulsaradmin/pkg/utils/topic_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (t *TopicName) GetPartition(index int) (*TopicName, error) {
return GetTopicName(topicNameWithPartition)
}

func (t *TopicName) GetPartitionIndex() int {
return t.partitionIndex
}

func getPartitionIndex(topic string) int {
if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) {
parts := strings.Split(topic, "-")
Expand Down

0 comments on commit ab042ae

Please sign in to comment.