Skip to content

Commit

Permalink
Merge branch 'l7mp:main' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
notdurson authored Dec 12, 2024
2 parents 36ac716 + 0e133b6 commit 04c49d6
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 16 deletions.
6 changes: 3 additions & 3 deletions docs/GATEWAY.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ Below is a reference of the most important fields of the Gateway [`spec`](https:
| :--- | :---: | :--- | :---: |
| `gatewayClassName` | `string` | The name of the GatewayClass that provides the root of the hierarchy the Gateway is attached to. | Yes |
| `listeners` | `list` | The list of TURN listeners. | Yes |
| `addresses` | `list` | The list of manually hinted external IP addresses for the rendered service (only the first one is used). | No |
| `addresses` | `list` | The list of manually hinted public addresses (only the first one is used). | No |

> [!WARNING]
>
Expand All @@ -152,11 +152,11 @@ Below is a reference of the most important fields of the Gateway [`spec`](https:
> - changing the transport protocol, port or TLS keys/certs of an *existing* listener will restart the TURN listener but leave the rest of the listeners intact;
> - changing the TURN authentication realm will restart *all* TURN listeners.

Manually hinted external address describes an address that can be bound to a Gateway. It is defined by an address type and an address value. Note that only the first address is used. Setting the `spec.addresses` field in the Gateway will result in the rendered Service's [loadBalancerIP](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#service-v1-core:~:text=non%20%27LoadBalancer%27%20type.-,loadBalancerIP,-string) and [externalIPs](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#service-v1-core:~:text=and%2Dservice%2Dproxies-,externalIPs,-string%20array) fields to be set.
Manually hinted public address describes an address that can be bound to a Gateway. It is defined by an address type, which can be either `IPAddress` (default) or `Hostname`, and an address value. Note that only the first address is used. Setting the `spec.addresses` field in the Gateway will enforce the use of that address all over STUNner as a public address for the gateway. If the address type specifies an IP address then that address will also be used in the Service created by STUNner to expose the Gateway as a [loadBalancerIP](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#service-v1-core:~:text=non%20%27LoadBalancer%27%20type.-,loadBalancerIP,-string) and [externalIPs](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#service-v1-core:~:text=and%2Dservice%2Dproxies-,externalIPs,-string%20array).

| Field | Type | Description | Required |
|:--------|:--------:|:--------------------------------------------------------------|:--------:|
| `type` | `string` | Type of the address. Currently only `IPAddress` is supported. | Yes |
| `type` | `string` | Type of the address, either `IPAddress` (default) or `Hostname`. | Yes |
| `value` | `string` | Address that should be bound to the Gateway's service. | Yes |

> [!WARNING]
Expand Down
39 changes: 34 additions & 5 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,44 @@ func (s *Stunner) NewStatusHandler() object.StatusHandler {
return func() stnrv1.Status { return s.Status() }
}

var lifecycleEventHandlerConstructor = newLifecycleEventHandlerStub
// Quota handler
type QuotaHandler interface {
QuotaHandler() turn.QuotaHandler
}

var quotaHandlerConstructor = newQuotaHandlerStub

// NewUserQuotaHandler creates a quota handler that defaults to a stub.
func (s *Stunner) NewQuotaHandler() QuotaHandler {
return quotaHandlerConstructor(s)
}

type quotaHandlerStub struct {
quotaHandler turn.QuotaHandler
}

func (q *quotaHandlerStub) QuotaHandler() turn.QuotaHandler {
return q.quotaHandler
}

func newQuotaHandlerStub(_ *Stunner) QuotaHandler {
return &quotaHandlerStub{
quotaHandler: func(_, _ string, _ net.Addr) (ok bool) {
return true
},
}
}

// Event handlers
var eventHandlerConstructor = newEventHandlerStub

// NewLifecycleEventHandler creates a set of callbcks for tracking the lifecycle of TURN allocations.
func (s *Stunner) NewLifecycleEventHandler() turn.EventHandlers {
return lifecycleEventHandlerConstructor(s)
// NewEventHandler creates a set of callbcks for tracking the lifecycle of TURN allocations.
func (s *Stunner) NewEventHandler() turn.EventHandlers {
return eventHandlerConstructor(s)
}

// LifecycleEventHandlerStub is a simple stub that logs allocation events.
func newLifecycleEventHandlerStub(s *Stunner) turn.EventHandlers {
func newEventHandlerStub(s *Stunner) turn.EventHandlers {
return turn.EventHandlers{
OnAuth: func(src, dst net.Addr, proto, username, realm string, method string, verdict bool) {
status := "REJECTED"
Expand Down
13 changes: 9 additions & 4 deletions internal/object/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Admin struct {
MetricsEndpoint, HealthCheckEndpoint string
metricsServer, healthCheckServer *http.Server
health *http.ServeMux
licenseManager licensecfg.ConfigManager
quota int
LicenseManager licensecfg.ConfigManager
licenseConfig *stnrv1.LicenseConfig
log logging.LeveledLogger
}
Expand All @@ -41,7 +42,7 @@ func NewAdmin(conf stnrv1.Config, dryRun bool, rc ReadinessHandler, status Statu
admin := Admin{
DryRun: dryRun,
health: http.NewServeMux(),
licenseManager: licensecfg.New(logger.NewLogger("license")),
LicenseManager: licensecfg.New(logger.NewLogger("license")),
log: logger.NewLogger("admin"),
}
admin.log.Tracef("NewAdmin: %s", req.String())
Expand Down Expand Up @@ -124,7 +125,9 @@ func (a *Admin) Reconcile(conf stnrv1.Config) error {
return err
}

a.licenseManager.Reconcile(req.LicenseConfig)
a.quota = req.UserQuota

a.LicenseManager.Reconcile(req.LicenseConfig)
a.licenseConfig = req.LicenseConfig

return nil
Expand Down Expand Up @@ -153,6 +156,7 @@ func (a *Admin) GetConfig() stnrv1.Config {
LogLevel: a.LogLevel,
MetricsEndpoint: a.MetricsEndpoint,
HealthCheckEndpoint: &h,
UserQuota: a.quota,
LicenseConfig: a.licenseConfig,
}
}
Expand Down Expand Up @@ -187,7 +191,8 @@ func (a *Admin) Status() stnrv1.Status {
LogLevel: a.LogLevel,
MetricsEndpoint: a.MetricsEndpoint,
HealthCheckEndpoint: a.HealthCheckEndpoint,
LicensingInfo: a.licenseManager.Status(),
UserQuota: fmt.Sprintf("%d", a.quota),
LicensingInfo: a.LicenseManager.Status(),
}
return &s
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/v1/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type AdminConfig struct {
// health-checking at `http://0.0.0.0:8086`. Set to a pointer to an empty string to disable
// health-checking.
HealthCheckEndpoint *string `json:"healthcheck_endpoint,omitempty"`
// UserQuota defines the number of permitted TURN allocatoins per username. Affects
// allocation created on any listener. Default is 0, meaning no quota is enforced.
UserQuota int `json:"user_quota,omitempty"`
// LicenseConfig describes the licensing info to be used to check subscription status with
// the license server.
LicenseConfig *LicenseConfig `json:"license_config,omitempty"`
Expand Down Expand Up @@ -69,6 +72,10 @@ func (req *AdminConfig) Validate() error {
}
}

if req.UserQuota < 0 {
req.UserQuota = 0
}

return nil
}

Expand Down Expand Up @@ -104,6 +111,9 @@ func (req *AdminConfig) String() string {
if req.HealthCheckEndpoint != nil {
status = append(status, fmt.Sprintf("health-check=%q", *req.HealthCheckEndpoint))
}
if req.UserQuota > 0 {
status = append(status, fmt.Sprintf("quota=%d", req.UserQuota))
}
status = append(status, fmt.Sprintf("license_info=%s", LicensingStatus(req.LicenseConfig)))

return fmt.Sprintf("admin:{%s}", strings.Join(status, ","))
Expand Down Expand Up @@ -131,6 +141,7 @@ type AdminStatus struct {
LogLevel string `json:"loglevel,omitempty"`
MetricsEndpoint string `json:"metrics_endpoint,omitempty"`
HealthCheckEndpoint string `json:"healthcheck_endpoint,omitempty"`
UserQuota string `json:"quota,omitempty"`
LicensingInfo string `json:"licensing_info,omitempty"`
}

Expand All @@ -146,6 +157,7 @@ func (a *AdminStatus) String() string {
if a.HealthCheckEndpoint != "" {
status = append(status, fmt.Sprintf("health-check=%q", a.HealthCheckEndpoint))
}
status = append(status, fmt.Sprintf("quota=%s", a.UserQuota))
if a.LicensingInfo != "" {
status = append(status, fmt.Sprintf("license-info=%s", a.LicensingInfo))
}
Expand Down
30 changes: 30 additions & 0 deletions reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,36 @@ var testReconcileDefault = []StunnerReconcileTestConfig{
net.ParseIP("3.0.0.0")), "route to 3.0.0.0 fails")
},
},
{
name: "reconcile-test: reconcile user quota",
config: stnrv1.StunnerConfig{
ApiVersion: stnrv1.ApiVersion,
Admin: stnrv1.AdminConfig{
UserQuota: 12,
LogLevel: stunnerTestLoglevel,
},
Auth: stnrv1.AuthConfig{
Credentials: map[string]string{
"username": "user",
"password": "pass",
},
},
Listeners: []stnrv1.ListenerConfig{},
Clusters: []stnrv1.ClusterConfig{},
},
tester: func(t *testing.T, s *Stunner, err error) {
assert.NoError(t, err, err)

a := s.GetAdmin()
assert.NotNil(t, a, "admin")

c := a.GetConfig()
assert.NotNil(t, c, "admin-getconfig")
ca, ok := c.(*stnrv1.AdminConfig)
assert.True(t, ok, "adminconfig cast")
assert.Equal(t, 12, ca.UserQuota, "quota")
},
},
}

// start with default config and then reconcile with the given config
Expand Down
3 changes: 2 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (s *Stunner) StartServer(l *object.Listener) error {
t, err := turn.NewServer(turn.ServerConfig{
Realm: s.GetRealm(),
AuthHandler: s.NewAuthHandler(),
EventHandlers: s.NewLifecycleEventHandler(),
EventHandlers: s.eventHandlers,
QuotaHandler: s.quotaHandler.QuotaHandler(),
PacketConnConfigs: pConns,
ListenerConfigs: lConns,
LoggerFactory: logger.NewRateLimitedLoggerFactory(s.logger, LogRateLimit, LogBurst),
Expand Down
16 changes: 14 additions & 2 deletions stunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/pion/logging"
"github.com/pion/transport/v3"
"github.com/pion/transport/v3/stdnet"
"github.com/pion/turn/v4"

"github.com/l7mp/stunner/internal/manager"
"github.com/l7mp/stunner/internal/object"
"github.com/l7mp/stunner/internal/resolver"
"github.com/l7mp/stunner/internal/telemetry"
stnrv1 "github.com/l7mp/stunner/pkg/apis/v1"
licensecfg "github.com/l7mp/stunner/pkg/config/license"
"github.com/l7mp/stunner/pkg/logger"
)

Expand All @@ -32,6 +34,8 @@ type Stunner struct {
telemetry *telemetry.Telemetry
logger logger.LoggerFactory
log logging.LeveledLogger
eventHandlers turn.EventHandlers
quotaHandler QuotaHandler
net transport.Net
ready, shutdown bool
}
Expand Down Expand Up @@ -101,6 +105,8 @@ func NewStunner(options Options) *Stunner {
object.NewListenerFactory(vnet, s.NewRealmHandler(), logger), logger)
s.clusterManager = manager.NewManager("cluster-manager",
object.NewClusterFactory(r, logger), logger)
s.eventHandlers = s.NewEventHandler()
s.quotaHandler = s.NewQuotaHandler()

telemetryCallbacks := telemetry.Callbacks{
GetAllocationCount: func() int64 { return s.GetActiveConnections() },
Expand Down Expand Up @@ -143,7 +149,7 @@ func (s *Stunner) Shutdown() {
s.ready = false
}

// GetAdmin returns the admin object underlying STUNner.
// GetAdmin returns the admin object. Panics if no admin object is available.
func (s *Stunner) GetAdmin() *object.Admin {
a, found := s.adminManager.Get(stnrv1.DefaultAdminName)
if !found {
Expand All @@ -152,7 +158,13 @@ func (s *Stunner) GetAdmin() *object.Admin {
return a.(*object.Admin)
}

// GetAuth returns the authenitation object underlying STUNner.
// GetLicenseConfigManager returns the manager handling license status. Panics if no manager is
// available.
func (s *Stunner) GetLicenseConfigManager() licensecfg.ConfigManager {
return s.GetAdmin().LicenseManager
}

// GetAuth returns the authenitation object. Panics if no auth object is available.
func (s *Stunner) GetAuth() *object.Auth {
a, found := s.authManager.Get(stnrv1.DefaultAuthName)
if !found {
Expand Down
2 changes: 1 addition & 1 deletion turncat.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (t *Turncat) newConnection(clientConn net.Conn) (*connection, error) {
// cert, err := tls.LoadX509KeyPair(certFile.Name(), keyFile.Name())
// assert.NoError(t, err, "cannot create certificate for TLS client socket")
c, err := tls.Dial("tcp", t.serverAddr.String(), &tls.Config{
MinVersion: tls.VersionTLS10,
MinVersion: tls.VersionTLS12,
ServerName: t.serverName,
InsecureSkipVerify: t.insecure,
})
Expand Down

0 comments on commit 04c49d6

Please sign in to comment.