Skip to content

Commit

Permalink
[feat] added support for offload policies in namespaces and topics
Browse files Browse the repository at this point in the history
  • Loading branch information
dn0 committed Jan 2, 2025
1 parent ffba2a8 commit c4409d1
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 3 deletions.
38 changes: 38 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ type Namespaces interface {
// RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace,
// defaulting to broker settings
RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error

// GetOffload returns the offload configuration for a namespace
GetOffload(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error)

// SetOffload sets the offload configuration on a namespace
SetOffload(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error

// DeleteOffload removes the offload configuration from a namespace
DeleteOffload(namespace utils.NameSpaceName) error
}

type namespaces struct {
Expand Down Expand Up @@ -940,3 +949,32 @@ func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceN
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
return n.pulsar.Client.Delete(endpoint)
}

func (n *namespaces) SetOffload(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error {
nsName, err := utils.GetNamespaceName(namespace.String())
if err != nil {
return err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies")
return n.pulsar.Client.Post(endpoint, policy)
}

func (n *namespaces) GetOffload(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error) {
var policy utils.OffloadPolicies
nsName, err := utils.GetNamespaceName(namespace.String())
if err != nil {
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies")
err = n.pulsar.Client.Get(endpoint, &policy)
return &policy, err
}

func (n *namespaces) DeleteOffload(namespace utils.NameSpaceName) error {
nsName, err := utils.GetNamespaceName(namespace.String())
if err != nil {
return err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "removeOffloadPolicies")
return n.pulsar.Client.Delete(endpoint)
}
84 changes: 82 additions & 2 deletions pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package admin
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func ptr(n int) *int {
Expand Down Expand Up @@ -341,3 +342,82 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
expected := int64(60)
assert.Equal(t, expected, offloadThresholdInSeconds)
}

func TestNamespaces_SetOffloadPolicy(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

tests := []struct {
name string
errReason string
policy *utils.OffloadPolicies
}{
{
name: "Set invalid empty offload policy",
errReason: "The driver is not supported, support value: S3,aws-s3," +
"google-cloud-storage,filesystem,azureblob,aliyun-oss",
policy: &utils.OffloadPolicies{},
},
{
name: "Set invalid S3 offload policy",
errReason: "The bucket must be specified for namespace offload.",
policy: &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "S3",
},
},
{
name: "Set valid filesystem offload policy",
errReason: "",
policy: &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "filesystem",
OffloadersDirectory: "/tmp",
ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := admin.Namespaces().SetOffload(*namespace, tt.policy)
if tt.errReason == "" {
assert.Equal(t, nil, err)
}
if err != nil {
restError := err.(rest.Error)
assert.Equal(t, tt.errReason, restError.Reason)
}
})
}
}

func TestNamespaces_GetAndDeleteOffloadPolicy(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// set simple filesystem offload policy and get it
err = admin.Namespaces().SetOffload(*namespace, &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "filesystem",
OffloadersDirectory: "/var/tmp",
ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST",
})
assert.Equal(t, nil, err)
offload, err := admin.Namespaces().GetOffload(*namespace)
assert.Equal(t, nil, err)
assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver)
assert.Equal(t, "/var/tmp", offload.OffloadersDirectory)
assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority)

// delete previously set filesystem offload policy
err = admin.Namespaces().DeleteOffload(*namespace)
assert.Equal(t, nil, err)
offload, err = admin.Namespaces().GetOffload(*namespace)
assert.Equal(t, nil, err)
assert.Equal(t, "", offload.ManagedLedgerOffloadDriver)
}
43 changes: 43 additions & 0 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,30 @@ type Topics interface {
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error

// GetOffload returns the offload configuration for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace level, if no policy set in topic level
GetOffload(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error)

// SetOffload sets the offload policy for a topic
//
// @param topic
// topicName struct
// @param policy
// Pointer to the OffloadPolicies struct with fields set according to the used
// tiered storage configuration
SetOffload(topic utils.TopicName, policy *utils.OffloadPolicies) error

// DeleteOffload removes the offload configuration on a topic
//
// @param topic
// topicName struct
DeleteOffload(topic utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -917,3 +941,22 @@ func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error)
err := t.pulsar.Client.Get(endpoint, &data)
return data, err
}

func (t *topics) GetOffload(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error) {
var policy utils.OffloadPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &policy, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return &policy, err
}

func (t *topics) SetOffload(topic utils.TopicName, policy *utils.OffloadPolicies) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.Post(endpoint, policy)
}

func (t *topics) DeleteOffload(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.Delete(endpoint)
}
92 changes: 91 additions & 1 deletion pulsaradmin/pkg/admin/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
Expand Down Expand Up @@ -523,3 +525,91 @@ func TestRetention(t *testing.T) {
100*time.Millisecond,
)
}

func TestSetOffloadPolicy(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)

tests := []struct {
name string
errReason string
policy *utils.OffloadPolicies
}{
{
name: "Set invalid empty offload policy",
errReason: "The driver is not supported, support value: S3,aws-s3," +
"google-cloud-storage,filesystem,azureblob,aliyun-oss",
policy: &utils.OffloadPolicies{},
},
{
name: "Set invalid S3 offload policy",
errReason: "The bucket must be specified for namespace offload.",
policy: &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "S3",
},
},
{
name: "Set valid filesystem offload policy",
errReason: "",
policy: &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "filesystem",
OffloadersDirectory: "/tmp",
ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := admin.Topics().SetOffload(*topicName, tt.policy)
if tt.errReason == "" {
assert.Equal(t, nil, err)
}
if err != nil {
restError := err.(rest.Error)
assert.Equal(t, tt.errReason, restError.Reason)
}
})
}
}

func TestGetAndDeleteOffloadPolicy(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)

// set simple filesystem offload policy and get it
err = admin.Topics().SetOffload(*topicName, &utils.OffloadPolicies{
ManagedLedgerOffloadDriver: "filesystem",
OffloadersDirectory: "/var/tmp",
ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST",
})
assert.Equal(t, nil, err)
offload, err := admin.Topics().GetOffload(*topicName, false)
assert.Equal(t, nil, err)
assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver)
assert.Equal(t, "/var/tmp", offload.OffloadersDirectory)
assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority)

// delete previously set filesystem offload policy
err = admin.Topics().DeleteOffload(*topicName)
assert.Equal(t, nil, err)
offload, err = admin.Topics().GetOffload(*topicName, false)
assert.Equal(t, nil, err)
assert.Equal(t, "", offload.ManagedLedgerOffloadDriver)

}
54 changes: 54 additions & 0 deletions pulsaradmin/pkg/utils/offload_policies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package utils

type OffloadPolicies struct {
FileSystemDriver bool `json:"fileSystemDriver"`
FileSystemProfilePath string `json:"fileSystemProfilePath"`
FileSystemURI string `json:"fileSystemURI"`
GcsDriver bool `json:"gcsDriver"`
GcsManagedLedgerOffloadBucket string `json:"gcsManagedLedgerOffloadBucket"`
GcsManagedLedgerOffloadMaxBlockSizeInBytes int `json:"gcsManagedLedgerOffloadMaxBlockSizeInBytes"`
GcsManagedLedgerOffloadReadBufferSizeInBytes int `json:"gcsManagedLedgerOffloadReadBufferSizeInBytes"`
GcsManagedLedgerOffloadRegion string `json:"gcsManagedLedgerOffloadRegion"`
GcsManagedLedgerOffloadServiceAccountKeyFile string `json:"gcsManagedLedgerOffloadServiceAccountKeyFile"`
ManagedLedgerExtraConfigurations map[string]string `json:"managedLedgerExtraConfigurations"`
ManagedLedgerOffloadBucket string `json:"managedLedgerOffloadBucket"`
ManagedLedgerOffloadDeletionLagInMillis int `json:"managedLedgerOffloadDeletionLagInMillis"`
ManagedLedgerOffloadDriver string `json:"managedLedgerOffloadDriver"`
ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"managedLedgerOffloadMaxBlockSizeInBytes"`
ManagedLedgerOffloadMaxThreads int `json:"managedLedgerOffloadMaxThreads"`
ManagedLedgerOffloadPrefetchRounds int `json:"managedLedgerOffloadPrefetchRounds"`
ManagedLedgerOffloadReadBufferSizeInBytes int `json:"managedLedgerOffloadReadBufferSizeInBytes"`
ManagedLedgerOffloadRegion string `json:"managedLedgerOffloadRegion"`
ManagedLedgerOffloadServiceEndpoint string `json:"managedLedgerOffloadServiceEndpoint"`
ManagedLedgerOffloadThresholdInBytes int `json:"managedLedgerOffloadThresholdInBytes"`
ManagedLedgerOffloadThresholdInSeconds int `json:"managedLedgerOffloadThresholdInSeconds"`
ManagedLedgerOffloadedReadPriority string `json:"managedLedgerOffloadedReadPriority"`
OffloadersDirectory string `json:"offloadersDirectory"`
S3Driver bool `json:"s3Driver"`
S3ManagedLedgerOffloadBucket string `json:"s3ManagedLedgerOffloadBucket"`
S3ManagedLedgerOffloadCredentialID string `json:"s3ManagedLedgerOffloadCredentialId"`
S3ManagedLedgerOffloadCredentialSecret string `json:"s3ManagedLedgerOffloadCredentialSecret"`
S3ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"s3ManagedLedgerOffloadMaxBlockSizeInBytes"`
S3ManagedLedgerOffloadReadBufferSizeInBytes int `json:"s3ManagedLedgerOffloadReadBufferSizeInBytes"`
S3ManagedLedgerOffloadRegion string `json:"s3ManagedLedgerOffloadRegion"`
S3ManagedLedgerOffloadRole string `json:"s3ManagedLedgerOffloadRole"`
S3ManagedLedgerOffloadRoleSessionName string `json:"s3ManagedLedgerOffloadRoleSessionName"`
S3ManagedLedgerOffloadServiceEndpoint string `json:"s3ManagedLedgerOffloadServiceEndpoint"`
}

0 comments on commit c4409d1

Please sign in to comment.