Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failover: Allow custom state setter #395

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/bsm/redislock"
"github.com/pace/bricks/backend/k8sapi"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health"
"github.com/pace/bricks/maintenance/log"
Expand All @@ -26,8 +25,6 @@ const (
ACTIVE status = 1
)

const Label = "jackfan.us.kg.pace.bricks.activepassive"

// ActivePassive implements a failover mechanism that allows
// to deploy a service multiple times but ony one will accept

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@monstermunchkin maybe there's a "l" missing in "only"?

// traffic by using the label selector of kubernetes.
Expand All @@ -51,31 +48,76 @@ type ActivePassive struct {
timeToFailover time.Duration
locker *redislock.Client

// access to the kubernetes api
k8sClient *k8sapi.Client
stateSetter StateSetter

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

type ActivePassiveOption func(*ActivePassive) error

func WithCustomStateSetter(fn func(ctx context.Context, state string) error) ActivePassiveOption {
return func(ap *ActivePassive) error {
stateSetter, err := NewCustomStateSetter(fn)
if err != nil {
return fmt.Errorf("failed to create state setter: %w", err)
}

ap.stateSetter = stateSetter

return nil
}
}

func WithNoopStateSetter() ActivePassiveOption {
return func(ap *ActivePassive) error {
ap.stateSetter = &NoopStateSetter{}

return nil
}
}

func WithPodStateSetter() ActivePassiveOption {
return func(ap *ActivePassive) error {
stateSetter, err := NewPodStateSetter()
if err != nil {
return fmt.Errorf("failed to create pod state setter: %w", err)
}

ap.stateSetter = stateSetter

return nil
}
}

// NewActivePassive creates a new active passive cluster
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
}

func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client, opts ...ActivePassiveOption) (*ActivePassive, error) {
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
k8sClient: k8sClient,
}

for _, opt := range opts {
if err := opt(activePassive); err != nil {
return nil, fmt.Errorf("failed to apply option: %w", err)
}
}

if activePassive.stateSetter == nil {
var err error

// Default state setter uses the k8s api to set the state.
activePassive.stateSetter, err = NewPodStateSetter()
if err != nil {
return nil, fmt.Errorf("failed to create default state setter: %w", err)
}
}

health.SetCustomReadinessCheck(activePassive.Handler)
Expand Down Expand Up @@ -198,7 +240,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.stateSetter.SetState(ctx, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down
53 changes: 53 additions & 0 deletions maintenance/failover/state_setter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package failover

import (
"context"
"fmt"

"github.com/pace/bricks/backend/k8sapi"
)

const Label = "jackfan.us.kg.pace.bricks.activepassive"

type StateSetter interface {
SetState(ctx context.Context, state string) error
}

type podStateSetter struct {
k8sClient *k8sapi.Client
}

func NewPodStateSetter() (*podStateSetter, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

return &podStateSetter{k8sClient: k8sClient}, nil
}

func (p *podStateSetter) SetState(ctx context.Context, state string) error {
return p.k8sClient.SetCurrentPodLabel(ctx, Label, state)
}

type CustomStateSetter struct {
fn func(ctx context.Context, state string) error
}

func NewCustomStateSetter(fn func(ctx context.Context, state string) error) (*CustomStateSetter, error) {
if fn == nil {
return nil, fmt.Errorf("fn must not be nil")
}

return &CustomStateSetter{fn: fn}, nil
}

func (c *CustomStateSetter) SetState(ctx context.Context, state string) error {
return c.fn(ctx, state)
}

type NoopStateSetter struct{}

func (n *NoopStateSetter) SetState(ctx context.Context, state string) error {
return nil
}
Loading