From 267f25a76e3aec4ddf005460fd1ec6055f6a5e78 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 1 Sep 2021 08:05:34 +0800 Subject: [PATCH] feat: add backlog quota command for topic (#429) Signed-off-by: Zixuan Liu ### Changes background from #246, the PR implements the following commands: - `pulsarctl topics get-backlog-quotas -a` - Get the backlog quota policy for a topic - `pulsarctl topics remove-backlog-quota --type ` - Remove a backlog quota policy from a topic - `pulsarctl topics set-backlog-quota --limit-size --limit-time --policy --type ` - Set a backlog quota policy for a topic ### TODO - [x] Add integration tests --- pkg/pulsar/topic.go | 43 +++++++++++++++++++++++++++++++ pkg/pulsar/utils/backlog_quota.go | 28 +++++++++++++++++--- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/pkg/pulsar/topic.go b/pkg/pulsar/topic.go index 2e0ba2f..82c24b5 100644 --- a/pkg/pulsar/topic.go +++ b/pkg/pulsar/topic.go @@ -19,6 +19,7 @@ package pulsar import ( "fmt" + "net/url" "strconv" "github.com/streamnative/pulsar-admin-go/pkg/pulsar/common" @@ -206,6 +207,15 @@ type Topics interface { // Remove compaction threshold for a topic RemoveCompactionThreshold(utils.TopicName) error + + // GetBacklogQuotaMap returns backlog quota map for a topic + GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) + + // SetBacklogQuota sets a backlog quota for a topic + SetBacklogQuota(utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error + + // RemoveBacklogQuota removes a backlog quota policy from a topic + RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error } type topics struct { @@ -630,3 +640,36 @@ func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error { err := t.pulsar.Client.Delete(endpoint) return err } + +func (t *topics) GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, + error) { + var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuotaMap") + + queryParams := map[string]string{"applied": strconv.FormatBool(applied)} + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &backlogQuotaMap, queryParams, true) + + return backlogQuotaMap, err +} + +func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota utils.BacklogQuota, + backlogQuotaType utils.BacklogQuotaType) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota") + + u, err := url.Parse(endpoint) + if err != nil { + return err + } + q := u.Query() + q.Add("backlogQuotaType", string(backlogQuotaType)) + u.RawQuery = q.Encode() + + return t.pulsar.Client.Post(u.String(), &backlogQuota) +} + +func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType utils.BacklogQuotaType) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota") + return t.pulsar.Client.DeleteWithQueryParams(endpoint, map[string]string{ + "backlogQuotaType": string(backlogQuotaType), + }) +} diff --git a/pkg/pulsar/utils/backlog_quota.go b/pkg/pulsar/utils/backlog_quota.go index 2d14f80..3b78243 100644 --- a/pkg/pulsar/utils/backlog_quota.go +++ b/pkg/pulsar/utils/backlog_quota.go @@ -35,10 +35,6 @@ func NewBacklogQuota(limitSize int64, limitTime int64, policy RetentionPolicy) B type RetentionPolicy string -type BacklogQuotaType string - -const DestinationStorage BacklogQuotaType = "destination_storage" - const ( ProducerRequestHold RetentionPolicy = "producer_request_hold" ProducerException RetentionPolicy = "producer_exception" @@ -47,6 +43,8 @@ const ( func ParseRetentionPolicy(str string) (RetentionPolicy, error) { switch str { + case ProducerRequestHold.String(): + return ProducerRequestHold, nil case ProducerException.String(): return ProducerException, nil case ConsumerBacklogEviction.String(): @@ -59,3 +57,25 @@ func ParseRetentionPolicy(str string) (RetentionPolicy, error) { func (s RetentionPolicy) String() string { return string(s) } + +type BacklogQuotaType string + +const ( + DestinationStorage BacklogQuotaType = "destination_storage" + MessageAge BacklogQuotaType = "message_age" +) + +func ParseBacklogQuotaType(str string) (BacklogQuotaType, error) { + switch str { + case DestinationStorage.String(): + return DestinationStorage, nil + case MessageAge.String(): + return MessageAge, nil + default: + return "", errors.Errorf("Invalid backlog quota type: %s", str) + } +} + +func (b BacklogQuotaType) String() string { + return string(b) +}