diff --git a/pkg/pulsar/topic.go b/pkg/pulsar/topic.go index 82c24b5..50061fb 100644 --- a/pkg/pulsar/topic.go +++ b/pkg/pulsar/topic.go @@ -216,6 +216,15 @@ type Topics interface { // RemoveBacklogQuota removes a backlog quota policy from a topic RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error + + // GetInactiveTopicPolicies gets the inactive topic policies on a topic + GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) + + // RemoveInactiveTopicPolicies removes inactive topic policies from a topic + RemoveInactiveTopicPolicies(utils.TopicName) error + + // SetInactiveTopicPolicies sets the inactive topic policies on a topic + SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error } type topics struct { @@ -673,3 +682,22 @@ func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType util "backlogQuotaType": string(backlogQuotaType), }) } + +func (t *topics) GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) { + var out utils.InactiveTopicPolicies + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &out, map[string]string{ + "applied": strconv.FormatBool(applied), + }, true) + return out, err +} + +func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + return t.pulsar.Client.Delete(endpoint) +} + +func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies") + return t.pulsar.Client.Post(endpoint, data) +} diff --git a/pkg/pulsar/utils/inactive_topic_policies.go b/pkg/pulsar/utils/inactive_topic_policies.go new file mode 100644 index 0000000..05f81b6 --- /dev/null +++ b/pkg/pulsar/utils/inactive_topic_policies.go @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +import "github.com/pkg/errors" + +type InactiveTopicDeleteMode string + +const ( + // The topic can be deleted when no subscriptions and no active producers. + DeleteWhenNoSubscriptions InactiveTopicDeleteMode = "delete_when_no_subscriptions" + // The topic can be deleted when all subscriptions catchup and no active producers/consumers. + DeleteWhenSubscriptionsCaughtUp InactiveTopicDeleteMode = "delete_when_subscriptions_caught_up" +) + +func (i InactiveTopicDeleteMode) String() string { + return string(i) +} + +func ParseInactiveTopicDeleteMode(str string) (InactiveTopicDeleteMode, error) { + switch str { + case DeleteWhenNoSubscriptions.String(): + return DeleteWhenNoSubscriptions, nil + case DeleteWhenSubscriptionsCaughtUp.String(): + return DeleteWhenSubscriptionsCaughtUp, nil + default: + return "", errors.Errorf("cannot parse %s to InactiveTopicDeleteMode type", str) + } +} + +type InactiveTopicPolicies struct { + InactiveTopicDeleteMode *InactiveTopicDeleteMode `json:"inactiveTopicDeleteMode"` + MaxInactiveDurationSeconds int `json:"maxInactiveDurationSeconds"` + DeleteWhileInactive bool `json:"deleteWhileInactive"` +} + +func NewInactiveTopicPolicies(inactiveTopicDeleteMode *InactiveTopicDeleteMode, maxInactiveDurationSeconds int, + deleteWhileInactive bool) InactiveTopicPolicies { + return InactiveTopicPolicies{ + InactiveTopicDeleteMode: inactiveTopicDeleteMode, + MaxInactiveDurationSeconds: maxInactiveDurationSeconds, + DeleteWhileInactive: deleteWhileInactive, + } +}