Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Add backlog, retention and ttl commands for pulsarctl (#61)
Browse files Browse the repository at this point in the history
* Add backlog, retention and ttl commands for pulsarctl

Signed-off-by: xiaolong.ran <[email protected]>
wolfstudy authored and maxsxu committed Mar 14, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 4a570e8 commit 73044bc
Showing 4 changed files with 122 additions and 25 deletions.
34 changes: 13 additions & 21 deletions pkg/pulsar/backlog_quota.go
Original file line number Diff line number Diff line change
@@ -18,33 +18,25 @@
package pulsar

type BacklogQuota struct {
Limit int64
Police RetentionPolicy
BacklogQuotaType BacklogQuotaType
Limit int64 `json:"limit"`
Policy RetentionPolicy `json:"policy"`
}

type RetentionPolicy int
func NewBacklogQuota(limit int64, policy RetentionPolicy) BacklogQuota {
return BacklogQuota{
Limit: limit,
Policy: policy,
}
}

type RetentionPolicy string

type BacklogQuotaType string

const DestinationStorage BacklogQuotaType = "destination_storage"

const (
ProducerRequestHold RetentionPolicy = iota
ProducerException
ConsumerBacklogEviction
ProducerRequestHold RetentionPolicy = "producer_request_hold"
ProducerException RetentionPolicy = "producer_exception"
ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction"
)

func (rp RetentionPolicy) String() string {
names := [...]string{
"ProducerRequestHold",
"ProducerException",
"ConsumerBacklogEviction",
}

if rp < ProducerRequestHold || rp > ConsumerBacklogEviction {
return "Unknown Retention Policy"
}

return names[rp]
}
8 changes: 6 additions & 2 deletions pkg/pulsar/data.go
Original file line number Diff line number Diff line change
@@ -152,8 +152,12 @@ type PartitionedTopicMetadata struct {
}

type NamespacesData struct {
NumBundles int `json:"numBundles"`
Clusters []string `json:"clusters"`
NumBundles int `json:"numBundles"`
Clusters []string `json:"clusters"`
MessageTTL int `json:"messageTTL"`
RetentionTimeStr string `json:"retentionTimeStr"`
LimitStr string `json:"limitStr"`
PolicyStr string `json:"policyStr"`
}

type SchemaData struct {
94 changes: 94 additions & 0 deletions pkg/pulsar/namespace.go
Original file line number Diff line number Diff line change
@@ -44,6 +44,27 @@ type Namespaces interface {

// Delete an existing bundle in a namespace
DeleteNamespaceBundle(namespace string, bundleRange string) error

// Set the messages Time to Live for all the topics within a namespace
SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error

// Get the message TTL for a namespace
GetNamespaceMessageTTL(namespace string) (int, error)

// Get the retention configuration for a namespace
GetRetention(namespace string) (*RetentionPolicies, error)

// Set the retention configuration for all the topics on a namespace
SetRetention(namespace string, policy RetentionPolicies) error

// Get backlog quota map on a namespace
GetBacklogQuotaMap(namespace string) (map[BacklogQuotaType]BacklogQuota, error)

// Set a backlog quota for all the topics on a namespace
SetBacklogQuota(namespace string, backlogQuota BacklogQuota) error

// Remove a backlog quota policy from a namespace
RemoveBacklogQuota(namespace string) error
}

type namespaces struct {
@@ -138,3 +159,76 @@ func (n *namespaces) DeleteNamespaceBundle(namespace string, bundleRange string)
endpoint := n.client.endpoint(n.basePath, ns.String(), bundleRange)
return n.client.delete(endpoint, nil)
}

func (n *namespaces) GetNamespaceMessageTTL(namespace string) (int, error) {
var ttl int
nsName, err := GetNamespaceName(namespace)
if err != nil {
return 0, err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "messageTTL")
err = n.client.get(endpoint, &ttl)
return ttl, err
}

func (n *namespaces) SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error {
nsName, err := GetNamespaceName(namespace)
if err != nil {
return err
}

endpoint := n.client.endpoint(n.basePath, nsName.String(), "messageTTL")
return n.client.post(endpoint, &ttlInSeconds, nil)
}

func (n *namespaces) SetRetention(namespace string, policy RetentionPolicies) error {
nsName, err := GetNamespaceName(namespace)
if err != nil {
return err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "retention")
return n.client.post(endpoint, &policy, nil)
}

func (n *namespaces) GetRetention(namespace string) (*RetentionPolicies, error) {
var policy RetentionPolicies
nsName, err := GetNamespaceName(namespace)
if err != nil {
return nil, err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "retention")
err = n.client.get(endpoint, &policy)
return &policy, err
}

func (n *namespaces) GetBacklogQuotaMap(namespace string) (map[BacklogQuotaType]BacklogQuota, error) {
var backlogQuotaMap map[BacklogQuotaType]BacklogQuota
nsName, err := GetNamespaceName(namespace)
if err != nil {
return nil, err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "backlogQuotaMap")
err = n.client.get(endpoint, &backlogQuotaMap)
return backlogQuotaMap, err
}

func (n *namespaces) SetBacklogQuota(namespace string, backlogQuota BacklogQuota) error {
nsName, err := GetNamespaceName(namespace)
if err != nil {
return err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "backlogQuota")
return n.client.post(endpoint, &backlogQuota, nil)
}

func (n *namespaces) RemoveBacklogQuota(namespace string) error {
nsName, err := GetNamespaceName(namespace)
if err != nil {
return err
}
endpoint := n.client.endpoint(n.basePath, nsName.String(), "backlogQuota")
params := map[string]string{
"backlogQuotaType": string(DestinationStorage),
}
return n.client.deleteWithQueryParams(endpoint, nil, params)
}
11 changes: 9 additions & 2 deletions pkg/pulsar/retention_policies.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,13 @@
package pulsar

type RetentionPolicies struct {
RetentionTimeInMinutes int
RetentionSizeInMB int64
RetentionTimeInMinutes int `json:"retentionTimeInMinutes"`
RetentionSizeInMB int64 `json:"retentionSizeInMB"`
}

func NewRetentionPolicies(retentionTimeInMinutes int, retentionSizeInMB int) RetentionPolicies {
return RetentionPolicies{
RetentionTimeInMinutes: retentionTimeInMinutes,
RetentionSizeInMB: int64(retentionSizeInMB),
}
}

0 comments on commit 73044bc

Please sign in to comment.