Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

refactor!: reorganize repo towards more ergonomic API #41

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions alias.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/utils/allocator_stats.go → allocator_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// specific language governing permissions and limitations
// under the License.

package utils
package pulsaradmin

type AllocatorStats struct {
NumDirectArenas int `json:"numDirectArenas"`
Expand Down
60 changes: 29 additions & 31 deletions pkg/admin/broker_stats.go → api_broker_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,56 @@
// specific language governing permissions and limitations
// under the License.

package admin

import (
"github.com/streamnative/pulsar-admin-go/pkg/utils"
)
package pulsaradmin

// BrokerStats is admin interface for broker stats management
type BrokerStats interface {
// GetMetrics returns Monitoring metrics
GetMetrics() ([]utils.Metrics, error)
GetMetrics() ([]Metrics, error)

// GetMBeans requests JSON string server mbean dump
GetMBeans() ([]utils.Metrics, error)
GetMBeans() ([]Metrics, error)

// GetTopics returns JSON string topics stats
GetTopics() (string, error)

// GetLoadReport returns load report of broker
GetLoadReport() (*utils.LocalBrokerData, error)
GetLoadReport() (*LocalBrokerData, error)

// GetAllocatorStats returns stats from broker
GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error)
GetAllocatorStats(allocatorName string) (*AllocatorStats, error)
}

type brokerStats struct {
pulsar *pulsarClient
basePath string
pulsar *pulsarClient
basePath string
apiVersion APIVersion
}

// BrokerStats is used to access the broker stats endpoints
func (c *pulsarClient) BrokerStats() BrokerStats {
return &brokerStats{
pulsar: c,
basePath: "/broker-stats",
pulsar: c,
basePath: "/broker-stats",
apiVersion: c.apiProfile.BrokerStats,
}
}

func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/metrics")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetMetrics() ([]Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/metrics")
var response []Metrics
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, err
}

return response, nil
}

func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/mbeans")
var response []utils.Metrics
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetMBeans() ([]Metrics, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/mbeans")
var response []Metrics
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, err
}
Expand All @@ -73,29 +71,29 @@ func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
}

func (bs *brokerStats) GetTopics() (string, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/topics")
buf, err := bs.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/topics")
buf, err := bs.pulsar.restClient.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return "", err
}

return string(buf), nil
}

func (bs *brokerStats) GetLoadReport() (*utils.LocalBrokerData, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/load-report")
response := utils.NewLocalBrokerData()
err := bs.pulsar.Client.Get(endpoint, &response)
func (bs *brokerStats) GetLoadReport() (*LocalBrokerData, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/load-report")
response := NewLocalBrokerData()
err := bs.pulsar.restClient.Get(endpoint, &response)
if err != nil {
return nil, nil
}
return &response, nil
}

func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error) {
endpoint := bs.pulsar.endpoint(bs.basePath, "/allocator-stats", allocatorName)
var allocatorStats utils.AllocatorStats
err := bs.pulsar.Client.Get(endpoint, &allocatorStats)
func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*AllocatorStats, error) {
endpoint := bs.pulsar.endpoint(bs.apiVersion, bs.basePath, "/allocator-stats", allocatorName)
var allocatorStats AllocatorStats
err := bs.pulsar.restClient.Get(endpoint, &allocatorStats)
if err != nil {
return nil, err
}
Expand Down
62 changes: 31 additions & 31 deletions pkg/admin/brokers.go → api_brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
// specific language governing permissions and limitations
// under the License.

package admin
package pulsaradmin

import (
"fmt"
"net/url"
"strings"

"github.com/streamnative/pulsar-admin-go/pkg/utils"
)

// Brokers is admin interface for brokers management
Expand All @@ -32,7 +30,7 @@ type Brokers interface {
GetDynamicConfigurationNames() ([]string, error)

// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error)
GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error)

// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
// brokers and all brokers can update {@link ServiceConfiguration} value locally
Expand All @@ -46,7 +44,7 @@ type Brokers interface {
GetRuntimeConfigurations() (map[string]string, error)

// GetInternalConfigurationData returns the internal configuration data
GetInternalConfigurationData() (*utils.InternalConfigurationData, error)
GetInternalConfigurationData() (*InternalConfigurationData, error)

// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
GetAllDynamicConfigurations() (map[string]string, error)
Expand All @@ -56,42 +54,44 @@ type Brokers interface {
}

type broker struct {
pulsar *pulsarClient
basePath string
pulsar *pulsarClient
basePath string
apiVersion APIVersion
}

// Brokers is used to access the brokers endpoints
func (c *pulsarClient) Brokers() Brokers {
return &broker{
pulsar: c,
basePath: "/brokers",
pulsar: c,
basePath: "/brokers",
apiVersion: c.apiProfile.Brokers,
}
}

func (b *broker) GetActiveBrokers(cluster string) ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, cluster)
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetDynamicConfigurationNames() ([]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/")
var res []string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error) {
endpoint := b.pulsar.endpoint(b.basePath, cluster, brokerURL, "ownedNamespaces")
var res map[string]utils.NamespaceOwnershipStatus
err := b.pulsar.Client.Get(endpoint, &res)
func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]NamespaceOwnershipStatus, error) {
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, cluster, brokerURL, "ownedNamespaces")
var res map[string]NamespaceOwnershipStatus
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
Expand All @@ -100,49 +100,49 @@ func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils

func (b *broker) UpdateDynamicConfiguration(configName, configValue string) error {
value := url.QueryEscape(configValue)
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName, value)
return b.pulsar.Client.Post(endpoint, nil)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", configName, value)
return b.pulsar.restClient.Post(endpoint, nil)
}

func (b *broker) DeleteDynamicConfiguration(configName string) error {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName)
return b.pulsar.Client.Delete(endpoint)
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", configName)
return b.pulsar.restClient.Delete(endpoint)
}

func (b *broker) GetRuntimeConfigurations() (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "runtime")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", "runtime")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) GetInternalConfigurationData() (*utils.InternalConfigurationData, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/internal-configuration")
var res utils.InternalConfigurationData
err := b.pulsar.Client.Get(endpoint, &res)
func (b *broker) GetInternalConfigurationData() (*InternalConfigurationData, error) {
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/internal-configuration")
var res InternalConfigurationData
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return &res, nil
}

func (b *broker) GetAllDynamicConfigurations() (map[string]string, error) {
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "values")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/configuration/", "values")
var res map[string]string
err := b.pulsar.Client.Get(endpoint, &res)
err := b.pulsar.restClient.Get(endpoint, &res)
if err != nil {
return nil, err
}
return res, nil
}

func (b *broker) HealthCheck() error {
endpoint := b.pulsar.endpoint(b.basePath, "/health")
endpoint := b.pulsar.endpoint(b.apiVersion, b.basePath, "/health")

buf, err := b.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
buf, err := b.pulsar.restClient.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return err
}
Expand Down
Loading