From ab042ae714d14ff8a07ba0d55ba4d887879e1e00 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 13 Aug 2024 18:58:49 +0800 Subject: [PATCH] [fix] peek message will return -1 for partitionIndex (#1267) ### Motivation If peek a partitioned topic, will see a message id: `7316:0:-1:-1`, the parititonIndex should not be -1. ``` pulsarctl subscription peek --count 10 persistent://public/default/my-topic-partition-0 test-sub Message ID : 7316:0:-1:-1 Properties : { "publish-time": "2024-08-08T17:50:39.476+08:00" } Message : 00000000 68 65 6c 6c 6f 2d 31 |hello-1| ``` ### Modifications - Set partition index on peek message. --- pulsaradmin/pkg/admin/subscription.go | 3 +- pulsaradmin/pkg/admin/subscription_test.go | 54 ++++++++++++++++++++++ pulsaradmin/pkg/utils/message_id.go | 9 ++++ pulsaradmin/pkg/utils/topic_name.go | 4 ++ 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index 8ddb584523..996ebb4e03 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -234,8 +234,9 @@ const ( ) func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) { + msgID := resp.Header.Get("X-Pulsar-Message-ID") - ID, err := utils.ParseMessageID(msgID) + ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex()) if err != nil { return nil, err } diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index c4ba717d37..92c79c1f60 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) { } +func TestPeekMessageForPartitionedTopic(t *testing.T) { + ctx := context.Background() + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + topicName, _ := utils.GetTopicName(topic) + subName := "test-sub" + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + err = admin.Topics().Create(*topicName, 2) + assert.NoError(t, err) + + err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest) + assert.NoError(t, err) + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.NoError(t, err) + defer producer.Close() + + for i := 0; i < 100; i++ { + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }, nil) + } + err = producer.Flush() + if err != nil { + return + } + + for i := 0; i < 2; i++ { + topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i) + topicName, err := utils.GetTopicName(topicWithPartition) + assert.NoError(t, err) + messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 10) + assert.NoError(t, err) + assert.NotNil(t, messages) + for _, msg := range messages { + assert.Equal(t, msg.GetMessageID().PartitionIndex, i) + } + } +} + func TestGetMessageByID(t *testing.T) { randomName := newTopicName() topic := "persistent://public/default/" + randomName diff --git a/pulsaradmin/pkg/utils/message_id.go b/pulsaradmin/pkg/utils/message_id.go index d75b613e1b..f65c031e80 100644 --- a/pulsaradmin/pkg/utils/message_id.go +++ b/pulsaradmin/pkg/utils/message_id.go @@ -34,6 +34,15 @@ type MessageID struct { var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1} var Earliest = MessageID{-1, -1, -1, -1} +func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID, error) { + id, err := ParseMessageID(str) + if err != nil { + return nil, err + } + id.PartitionIndex = index + return id, nil +} + func ParseMessageID(str string) (*MessageID, error) { s := strings.Split(str, ":") diff --git a/pulsaradmin/pkg/utils/topic_name.go b/pulsaradmin/pkg/utils/topic_name.go index 268abd73d1..7871983733 100644 --- a/pulsaradmin/pkg/utils/topic_name.go +++ b/pulsaradmin/pkg/utils/topic_name.go @@ -143,6 +143,10 @@ func (t *TopicName) GetPartition(index int) (*TopicName, error) { return GetTopicName(topicNameWithPartition) } +func (t *TopicName) GetPartitionIndex() int { + return t.partitionIndex +} + func getPartitionIndex(topic string) int { if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) { parts := strings.Split(topic, "-")