Skip to content

Commit

Permalink
[feat] Add support for subscription expiration time namespace settings (
Browse files Browse the repository at this point in the history
#1254)

Fixes #1253 

### Motivation

Adds support for the get / set / delete `subscriptionExpirationTime` namespace settings.

### Modifications

Created new functions in `namespace.go` that implement support for `subscriptionExpirationTime` settings
  • Loading branch information
klevy-toast authored Jul 27, 2024
1 parent c3b0633 commit c74460d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ type Namespaces interface {

// SetInactiveTopicPolicies sets the inactive topic policies on a namespace
SetInactiveTopicPolicies(namespace utils.NameSpaceName, data utils.InactiveTopicPolicies) error

// GetSubscriptionExpirationTime gets the subscription expiration time on a namespace. Returns -1 if not set
GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error)

// SetSubscriptionExpirationTime sets the subscription expiration time on a namespace
SetSubscriptionExpirationTime(namespace utils.NameSpaceName, expirationTimeInMinutes int) error

// RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace,
// defaulting to broker settings
RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error
}

type namespaces struct {
Expand Down Expand Up @@ -893,3 +903,22 @@ func (n *namespaces) SetInactiveTopicPolicies(namespace utils.NameSpaceName, dat
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "inactiveTopicPolicies")
return n.pulsar.Client.Post(endpoint, data)
}

func (n *namespaces) GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error) {
var result = -1

endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
err := n.pulsar.Client.Get(endpoint, &result)
return result, err
}

func (n *namespaces) SetSubscriptionExpirationTime(namespace utils.NameSpaceName,
subscriptionExpirationTimeInMinutes int) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
return n.pulsar.Client.Post(endpoint, &subscriptionExpirationTimeInMinutes)
}

func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
return n.pulsar.Client.Delete(endpoint)
}
76 changes: 76 additions & 0 deletions pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,79 @@ func TestRevokeSubPermission(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 0, len(permissions[sub]))
}

func TestNamespaces_SetSubscriptionExpirationTime(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

tests := []struct {
name string
namespace string
subscriptionExpirationTime int
errReason string
}{
{
name: "Set valid subscription expiration time",
namespace: "public/default",
subscriptionExpirationTime: 60,
errReason: "",
},
{
name: "Set invalid subscription expiration time",
namespace: "public/default",
subscriptionExpirationTime: -60,
errReason: "Invalid value for subscription expiration time",
},
{
name: "Set valid subscription expiration time: 0",
namespace: "public/default",
subscriptionExpirationTime: 0,
errReason: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
namespace, _ := utils.GetNamespaceName(tt.namespace)
err := admin.Namespaces().SetSubscriptionExpirationTime(*namespace, tt.subscriptionExpirationTime)
if tt.errReason == "" {
assert.Equal(t, nil, err)

err = admin.Namespaces().RemoveSubscriptionExpirationTime(*namespace)
assert.Equal(t, nil, err)
}
if err != nil {
restError := err.(rest.Error)
assert.Equal(t, tt.errReason, restError.Reason)
}
})
}
}

func TestNamespaces_GetSubscriptionExpirationTime(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// set the subscription expiration time and get it
err = admin.Namespaces().SetSubscriptionExpirationTime(*namespace,
60)
assert.Equal(t, nil, err)
subscriptionExpirationTime, err := admin.Namespaces().GetSubscriptionExpirationTime(*namespace)
assert.Equal(t, nil, err)
expected := 60
assert.Equal(t, expected, subscriptionExpirationTime)

// remove the subscription expiration time and get it
err = admin.Namespaces().RemoveSubscriptionExpirationTime(*namespace)
assert.Equal(t, nil, err)

subscriptionExpirationTime, err = admin.Namespaces().GetSubscriptionExpirationTime(*namespace)
assert.Equal(t, nil, err)
expected = -1
assert.Equal(t, expected, subscriptionExpirationTime)
}

0 comments on commit c74460d

Please sign in to comment.