Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Add topic stats command (#46)
Browse files Browse the repository at this point in the history
Add command topic `stats`, `internal-stats`, `partition-stats`

* topic stats *
```
➜  pulsarctl git:(topic-stats) ✗ ./pulsarctl topic stats -h
USED FOR:
    This command is used for getting the stats for an existing topic and its connected producers and consumers. (All the rates are computed over a 1 minute window and are relative the last completed 1 minute period)

REQUIRED PERMISSION:
    This command requires namespace admin permissions.

EXAMPLES:
    #Get the non-partitioned topic <topic-name> stats
    pulsarctl topic stats <topic-name>

    #Get the partitioned topic <topic-name> stats
    pulsarctl topic stats --partition <topic-name>

    #Get the partitioned topic <topic-name> stats and per partition stats
    pulsarctl topic stats --partition --per-partition <topic-name>

OUTPUT:
    #Get the non-partitioned topic stats
    {
      "msgRateIn": 0,
      "msgRateOut": 0,
      "msgThroughputIn": 0,
      "msgThroughputOut": 0,
      "averageMsgSize": 0,
      "storageSize": 0,
      "publishers": [],
      "subscriptions": {},
      "replication": {},
      "deduplicationStatus": "Disabled"
    }

    #Get the partitioned topic stats
    {
      "msgRateIn": 0,
      "msgRateOut": 0,
      "msgThroughputIn": 0,
      "msgThroughputOut": 0,
      "averageMsgSize": 0,
      "storageSize": 0,
      "publishers": [],
      "subscriptions": {},
      "replication": {},
      "deduplicationStatus": "",
      "metadata": {
        "partitions": 1
      },
      "partitions": {}
    }

    #Get the partitioned topic stats and per partition topic stats
    {
      "msgRateIn": 0,
      "msgRateOut": 0,
      "msgThroughputIn": 0,
      "msgThroughputOut": 0,
      "averageMsgSize": 0,
      "storageSize": 0,
      "publishers": [],
      "subscriptions": {},
      "replication": {},
      "deduplicationStatus": "",
      "metadata": {
        "partitions": 1
      },
      "partitions": {
        "<topic-name>": {
          "msgRateIn": 0,
          "msgRateOut": 0,
          "msgThroughputIn": 0,
          "msgThroughputOut": 0,
          "averageMsgSize": 0,
          "storageSize": 0,
          "publishers": [],
          "subscriptions": {},
          "replication": {},
          "deduplicationStatus": ""
        }
      }
    }

    #the topic name is not specified
    [✖]  only one argument is allowed to be used as a name

    #the specified topic is not exist or the specified topic is a partitioned-topic and you don't specified --partition or the specified topic is a non-partitioned topic and you specified --partition
    code: 404 reason: Topic not found

    #the topic name is not in the format of <tenant>/<namespace>/<topic> or <topic>
    [✖]  Invalid short topic name '<topic-name>', it should be in the format of <tenant>/<namespace>/<topic> or <topic>

    #the topic name is not in the format of <domain>://<tenant>/<namespace>/<topic>
    [✖]  Invalid complete topic name '<topic-name>', it should be in the format of <domain>://<tenant>/<namespace>/<topic>

    #the topic name is not in the format of <tenant>/<namespace>/<topic>
    [✖]  Invalid topic name '<topic-name>', it should be in the format of<tenant>/<namespace>/<topic>

    #the namespace name is not in the format of <tenant>/<namespace>
    [✖]  The complete name of namespace is invalid. complete name : <namespace-complete-name>

    #the tenant name and(or) namespace name is empty
    [✖]  Invalid tenant or namespace. [<tenant>/<namespace>]

    #the tenant name contains unsupported special chars. the alphanumeric (a-zA-Z0-9) and the special chars (-=:.%)  is allowed
    [✖]  Tenant name include unsupported special chars. tenant : [<namespace>]

    #the namespace name contains unsupported special chars. the  alphanumeric (a-zA-Z0-9) and the special chars (-=:.%) is allowed
    [✖]  Namespace name include unsupported special chars. namespace : [<namespace>]

Usage: pulsarctl topics stats [flags]

```

* topic internal-stats *
```
➜  pulsarctl-yong git:(topic-stats) ./pulsarctl topic internal-stats -h
USED FOR:
    This command is used for getting the internal stats for an existing non-partitioned topic.

REQUIRED PERMISSION:
    This command requires namespace admin permissions.

EXAMPLES:
    #Get internal stats for an existing non-partitioned-topic <topic-name>
    pulsarctl topic internal-stats <topic-name>

OUTPUT:
    #normal output
    {
      "entriesAddedCounter": 0,
      "numberOfEntries": 0,
      "totalSize": 0,
      "currentLedgerEntries": 0,
      "currentLedgerSize": 0,
      "lastLedgerCreatedTimestamp": "",
      "lastLedgerCreationFailureTimestamp": "",
      "waitingCursorsCount": 0,
      "pendingAddEntriesCount": 0,
      "lastConfirmedEntry": "",
      "state": "",
      "ledgers": [
        {
          "ledgerId": 0,
          "entries": 0,
          "size": 0,
          "offloaded": false
        }
      ],
      "cursors": {}
    }

    #the topic name is not specified
    [✖]  only one argument is allowed to be used as a name

    #the specified topic is not exist or the specified topic is a partitioned topic
    [✖]  code: 404 reason: Topic not found

    #the topic name is not in the format of <tenant>/<namespace>/<topic> or <topic>
    [✖]  Invalid short topic name '<topic-name>', it should be in the format of <tenant>/<namespace>/<topic> or <topic>

    #the topic name is not in the format of <domain>://<tenant>/<namespace>/<topic>
    [✖]  Invalid complete topic name '<topic-name>', it should be in the format of <domain>://<tenant>/<namespace>/<topic>

    #the topic name is not in the format of <tenant>/<namespace>/<topic>
    [✖]  Invalid topic name '<topic-name>', it should be in the format of<tenant>/<namespace>/<topic>

    #the namespace name is not in the format of <tenant>/<namespace>
    [✖]  The complete name of namespace is invalid. complete name : <namespace-complete-name>

    #the tenant name and(or) namespace name is empty
    [✖]  Invalid tenant or namespace. [<tenant>/<namespace>]

    #the tenant name contains unsupported special chars. the alphanumeric (a-zA-Z0-9) and the special chars (-=:.%)  is allowed
    [✖]  Tenant name include unsupported special chars. tenant : [<namespace>]

    #the namespace name contains unsupported special chars. the  alphanumeric (a-zA-Z0-9) and the special chars (-=:.%) is allowed
    [✖]  Namespace name include unsupported special chars. namespace : [<namespace>]

Usage: pulsarctl topics internal-stats [flags]

Aliases: internal-stats,
```
  • Loading branch information
zymap authored and maxsxu committed Mar 14, 2023
1 parent f9d5cbf commit f8c6204
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
13 changes: 11 additions & 2 deletions pkg/pulsar/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,20 @@ func (c *client) endpoint(componentPath string, parts ...string) string {

// get is used to do a GET request against an endpoint
// and deserialize the response into an interface
func (c *client) getAndDecode(endpoint string, obj interface{}, decode bool) ([]byte, error) {
func (c *client) getWithQueryParams(endpoint string, obj interface{}, params map[string]string, decode bool) ([]byte, error) {
req, err := c.newRequest(http.MethodGet, endpoint)
if err != nil {
return nil, err
}

if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}

resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
return nil, err
Expand All @@ -174,7 +182,7 @@ func (c *client) getAndDecode(endpoint string, obj interface{}, decode bool) ([]
}

func (c *client) get(endpoint string, obj interface{}) error {
_, err := c.getAndDecode(endpoint, obj, true)
_, err := c.getWithQueryParams(endpoint, obj, nil, true)
return err
}

Expand Down Expand Up @@ -215,6 +223,7 @@ func (c *client) deleteWithQueryParams(endpoint string, obj interface{}, params
for k, v := range params {
query.Add(k, v)
}
req.params = query
}

resp, err := checkSuccessful(c.doRequest(req))
Expand Down
116 changes: 116 additions & 0 deletions pkg/pulsar/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,122 @@ type NamespacesData struct {
Clusters []string `json:"clusters"`
}

type TopicStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
AverageMsgSize float64 `json:"averageMsgSize"`
StorageSize int64 `json:"storageSize"`
Publishers []PublisherStats `json:"publishers"`
Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
Replication map[string]ReplicatorStats `json:"replication"`
DeDuplicationStatus string `json:"deduplicationStatus"`
}

type PublisherStats struct {
ProducerId int64 `json:"producerId"`
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
AverageMsgSize float64 `json:"averageMsgSize"`
Metadata map[string]string `json:"metadata"`
}

type SubscriptionStats struct {
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
MsgDelayed int64 `json:"msgDelayed"`
unAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
Consumers []ConsumerStats `json:"consumers"`
IsReplicated bool `json:"isReplicated"`
}

type ConsumerStats struct {
ConsumerName string `json:"consumerName"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
AvailablePermits int `json:"availablePermits"`
UnAckedMessages int `json:"unackedMessages"`
BlockedConsumerOnUnAckedMsgs bool `json:"blockedConsumerOnUnackedMsgs"`
Metadata map[string]string `json:"metadata"`
}

type ReplicatorStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateExpired float64 `json:"msgRateExpired"`
ReplicationBacklog int64 `json:"replicationBacklog"`
Connected bool `json:"connected"`
ReplicationDelayInSeconds int64 `json:"replicationDelayInSeconds"`
InboundConnection string `json:"inboundConnection"`
InboundConnectedSince string `json:"inboundConnectedSince"`
OutboundConnection string `json:"outboundConnection"`
OutboundConnectedSince string `json:"outboundConnectedSince"`
}

type PersistentTopicInternalStats struct {
EntriesAddedCounter int64 `json:"entriesAddedCounter"`
NumberOfEntries int64 `json:"numberOfEntries"`
TotalSize int64 `json:"totalSize"`
CurrentLedgerEntries int64 `json:"currentLedgerEntries"`
CurrentLedgerSize int64 `json:"currentLedgerSize"`
LastLedgerCreatedTimestamp string `json:"lastLedgerCreatedTimestamp"`
LastLedgerCreationFailureTimestamp string `json:"lastLedgerCreationFailureTimestamp"`
WaitingCursorsCount int `json:"waitingCursorsCount"`
PendingAddEntriesCount int `json:"pendingAddEntriesCount"`
LastConfirmedEntry string `json:"lastConfirmedEntry"`
State string `json:"state"`
Ledgers []LedgerInfo `json:"ledgers"`
Cursors map[string]CursorStats `json:"cursors"`
}

type LedgerInfo struct {
LedgerId int64 `json:"ledgerId"`
Entries int64 `json:"entries"`
Size int64 `json:"size"`
Offloaded bool `json:"offloaded"`
}

type CursorStats struct {
MarkDeletePosition string `json:"markDeletePosition"`
ReadPosition string `json:"readPosition"`
WaitingReadOp bool `json:"waitingReadOp"`
PendingReadOps int `json:"pendingReadOps"`
MessagesConsumedCounter int64 `json:"messagesConsumedCounter"`
CursorLedger int64 `json:"cursorLedger"`
CursorLedgerLastEntry int64 `json:"cursorLedgerLastEntry"`
IndividuallyDeletedMessages string `json:"individuallyDeletedMessages"`
LastLedgerWitchTimestamp string `json:"lastLedgerWitchTimestamp"`
State string `json:"state"`
NumberOfEntriesSinceFirstNotAckedMessage int64 `json:"numberOfEntriesSinceFirstNotAckedMessage"`
TotalNonContiguousDeletedMessagesRange int `json:"totalNonContiguousDeletedMessagesRange"`
Properties map[string]int64 `json:"properties"`
}

type PartitionedTopicStats struct {
MsgRateIn float64 `json:"msgRateIn"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
AverageMsgSize float64 `json:"averageMsgSize"`
StorageSize int64 `json:"storageSize"`
Publishers []PublisherStats `json:"publishers"`
Subscriptions map[string]SubscriptionStats `json:"subscriptions"`
Replication map[string]ReplicatorStats `json:"replication"`
DeDuplicationStatus string `json:"deduplicationStatus"`
Metadata PartitionedTopicMetadata `json:"metadata"`
Partitions map[string]TopicStats `json:"partitions"`
}

type SchemaData struct {
Version int64 `json:"version"`
Filename string `json:"filename"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/pulsar/namespace_name_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pulsar
package pulsar

import (
"github.com/stretchr/testify/assert"
Expand Down
29 changes: 28 additions & 1 deletion pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type Topics interface {
Lookup(TopicName) (LookupData, error)
GetBundleRange(TopicName) (string, error)
GetLastMessageId(TopicName) (MessageId, error)
GetStats(TopicName) (TopicStats, error)
GetInternalStats(TopicName) (PersistentTopicInternalStats, error)
GetPartitionedStats(TopicName, bool) (PartitionedTopicStats, error)
}

type topics struct {
Expand Down Expand Up @@ -118,7 +121,7 @@ func (t *topics) Lookup(topic TopicName) (LookupData, error) {

func (t *topics) GetBundleRange(topic TopicName) (string, error) {
endpoint := fmt.Sprintf("%s/%s/%s", t.lookupPath, topic.GetRestPath(), "bundle")
data, err := t.client.getAndDecode(endpoint, nil, false)
data, err := t.client.getWithQueryParams(endpoint, nil, nil, false)
return string(data), err
}

Expand All @@ -128,3 +131,27 @@ func (t *topics) GetLastMessageId(topic TopicName) (MessageId, error) {
err := t.client.get(endpoint, &messageId)
return messageId, err
}

func (t *topics) GetStats(topic TopicName) (TopicStats, error) {
var stats TopicStats
endpoint := t.client.endpoint(t.basePath, topic.GetRestPath(), "stats")
err := t.client.get(endpoint, &stats)
return stats, err
}

func (t *topics) GetInternalStats(topic TopicName) (PersistentTopicInternalStats, error) {
var stats PersistentTopicInternalStats
endpoint := t.client.endpoint(t.basePath, topic.GetRestPath(), "internalStats")
err := t.client.get(endpoint, &stats)
return stats, err
}

func (t *topics) GetPartitionedStats(topic TopicName, perPartition bool) (PartitionedTopicStats, error) {
var stats PartitionedTopicStats
endpoint := t.client.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats")
params := map[string]string{
"perPartition": strconv.FormatBool(perPartition),
}
_, err := t.client.getWithQueryParams(endpoint, &stats, params, true)
return stats, err
}

0 comments on commit f8c6204

Please sign in to comment.