Skip to content

Commit

Permalink
feat: Optional kubernetes-based leader election (#2438)
Browse files Browse the repository at this point in the history
Signed-off-by: David Farr <[email protected]>
  • Loading branch information
dfarr authored and whynowy committed Feb 10, 2023
1 parent 413ca1f commit d0481ae
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 65 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode/
.idea/
.DS_Store
.env
vendor/
dist/
# delve debug binaries
Expand All @@ -12,3 +13,4 @@ debug.test
*.out
site/
/go-diagrams/
argo-events
6 changes: 6 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
EnvVarKubeConfig = "KUBECONFIG"
// EnvVarDebugLog is the env var to turn on the debug mode for logging
EnvVarDebugLog = "DEBUG_LOG"
// ENVVarPodName should be set to the name of the pod
EnvVarPodName = "POD_NAME"
// ENVVarLeaderElection sets the leader election mode
EnvVarLeaderElection = "LEADER_ELECTION"
// EnvImagePullPolicy is the env var to set container's ImagePullPolicy
EnvImagePullPolicy = "IMAGE_PULL_POLICY"
)
Expand Down Expand Up @@ -119,6 +123,8 @@ const (
LabelOwnerName = "owner-name"
// AnnotationResourceSpecHash is the annotation of a K8s resource spec hash
AnnotationResourceSpecHash = "resource-spec-hash"
// AnnotationLeaderElection is the annotation for leader election
AnnotationLeaderElection = "events.argoproj.io/leader-election"
)

// various supported media types
Expand Down
149 changes: 111 additions & 38 deletions common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,31 @@ import (
"context"
"crypto/tls"
"fmt"
"os"
"strings"
"time"

"github.com/fsnotify/fsnotify"
"github.com/nats-io/graft"
nats "github.com/nats-io/nats.go"
"github.com/spf13/viper"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

var (
eventBusAuthFileMountPath = common.EventBusAuthFileMountPath
)

type Elector interface {
RunOrDie(context.Context, LeaderCallbacks)
}
Expand All @@ -27,73 +38,71 @@ type LeaderCallbacks struct {
OnStoppedLeading func()
}

func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) {
logger := logging.FromContext(ctx)

var eventBusType apicommon.EventBusType
var eventBusAuth *eventbusv1alpha1.AuthStrategy
func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int, namespace string, leasename string, hostname string) (Elector, error) {
switch {
case strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s":
return newKubernetesElector(namespace, leasename, hostname)
case eventBusConfig.NATS != nil:
eventBusType = apicommon.EventBusNATS
eventBusAuth = eventBusConfig.NATS.Auth
return newEventBusElector(ctx, eventBusConfig.NATS.Auth, clusterName, clusterSize, eventBusConfig.NATS.URL)
case eventBusConfig.JetStream != nil:
eventBusType = apicommon.EventBusJetStream
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL)
default:
return nil, fmt.Errorf("invalid event bus")
}
}

func newEventBusElector(ctx context.Context, authStrategy *eventbusv1alpha1.AuthStrategy, clusterName string, clusterSize int, url string) (Elector, error) {
auth, err := getEventBusAuth(ctx, authStrategy)
if err != nil {
return nil, err
}

return &natsEventBusElector{
clusterName: clusterName,
size: clusterSize,
url: url,
auth: auth,
}, nil
}

func getEventBusAuth(ctx context.Context, authStrategy *eventbusv1alpha1.AuthStrategy) (*eventbuscommon.Auth, error) {
logger := logging.FromContext(ctx)

var auth *eventbuscommon.Auth
cred := &eventbuscommon.AuthCredential{}
if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone {

if authStrategy == nil || *authStrategy == eventbusv1alpha1.AuthStrategyNone {
auth = &eventbuscommon.Auth{
Strategy: eventbusv1alpha1.AuthStrategyNone,
}
} else {
v := viper.New()
v.SetConfigName("auth")
v.SetConfigType("yaml")
v.AddConfigPath(common.EventBusAuthFileMountPath)
err := v.ReadInConfig()
if err != nil {
v.AddConfigPath(eventBusAuthFileMountPath)

if err := v.ReadInConfig(); err != nil {
return nil, fmt.Errorf("failed to load auth.yaml. err: %w", err)
}
err = v.Unmarshal(cred)
if err != nil {

cred := &eventbuscommon.AuthCredential{}
if err := v.Unmarshal(cred); err != nil {
logger.Errorw("failed to unmarshal auth.yaml", zap.Error(err))
return nil, err
}

v.WatchConfig()
v.OnConfigChange(func(e fsnotify.Event) {
// Auth file changed, let it restart.
logger.Fatal("Eventbus auth config file changed, exiting..")
})

auth = &eventbuscommon.Auth{
Strategy: *eventBusAuth,
Strategy: *authStrategy,
Credential: cred,
}
}

var elector Elector
switch eventBusType {
case apicommon.EventBusNATS:
elector = &natsEventBusElector{
clusterName: clusterName,
size: clusterSize,
url: eventBusConfig.NATS.URL,
auth: auth,
}
case apicommon.EventBusJetStream:
elector = &natsEventBusElector{
clusterName: clusterName,
size: clusterSize,
url: eventBusConfig.JetStream.URL,
auth: auth,
}
default:
return nil, fmt.Errorf("invalid eventbus type")
}
return elector, nil
return auth, nil
}

type natsEventBusElector struct {
Expand Down Expand Up @@ -179,3 +188,67 @@ func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCall
}
}
}

type kubernetesElector struct {
namespace string
leasename string
hostname string
}

func newKubernetesElector(namespace string, leasename string, hostname string) (Elector, error) {
return &kubernetesElector{
namespace: namespace,
leasename: leasename,
hostname: hostname,
}, nil
}

func (e *kubernetesElector) RunOrDie(ctx context.Context, callbacks LeaderCallbacks) {
logger := logging.FromContext(ctx)

config, err := rest.InClusterConfig()
if err != nil {
logger.Fatalw("Failed to retrieve kubernetes config", zap.Error(err))
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Fatalw("Failed to create kubernetes client", zap.Error(err))
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: e.leasename,
Namespace: e.namespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: e.hostname,
},
}

for {
select {
case <-ctx.Done():
return
default:
ctx, cancel := context.WithCancel(ctx)
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 5 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: callbacks.OnStartedLeading,
OnStoppedLeading: callbacks.OnStoppedLeading,
},
})

// When the leader is lost, leaderelection.RunOrDie will
// cease blocking and we will cancel the context. This
// will halt all eventsource/sensor go routines.
cancel()
}
}
}
51 changes: 51 additions & 0 deletions common/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package leaderelection

import (
"context"
"os"
"testing"

"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/stretchr/testify/assert"
)

var (
configs = []eventbusv1alpha1.BusConfig{
{NATS: &eventbusv1alpha1.NATSConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{}},
}
)

func TestLeaderElectionWithInvalidEventBus(t *testing.T) {
elector, err := NewElector(context.TODO(), eventbusv1alpha1.BusConfig{}, "", 0, "", "", "")

assert.Nil(t, elector)
assert.EqualError(t, err, "invalid event bus")
}

func TestLeaderElectionWithEventBusElector(t *testing.T) {
eventBusAuthFileMountPath = "test"

for _, config := range configs {
elector, err := NewElector(context.TODO(), config, "", 0, "", "", "")
assert.Nil(t, err)

_, ok := elector.(*natsEventBusElector)
assert.True(t, ok)
}
}

func TestLeaderElectionWithKubernetesElector(t *testing.T) {
eventBusAuthFileMountPath = "test"

os.Setenv(common.EnvVarLeaderElection, "k8s")

for _, config := range configs {
elector, err := NewElector(context.TODO(), config, "", 0, "", "", "")
assert.Nil(t, err)

_, ok := elector.(*kubernetesElector)
assert.True(t, ok)
}
}
3 changes: 3 additions & 0 deletions common/leaderelection/test/auth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
token: "token"
username: "username"
password: "password"
6 changes: 5 additions & 1 deletion controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,13 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
Value: fmt.Sprintf("eventbus-%s", args.EventSource.Namespace),
},
{
Name: "POD_NAME",
Name: common.EnvVarPodName,
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}},
},
{
Name: common.EnvVarLeaderElection,
Value: args.EventSource.Annotations[common.AnnotationLeaderElection],
},
}

busConfigBytes, err := json.Marshal(eventBus.Status.Config)
Expand Down
6 changes: 5 additions & 1 deletion controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
Value: fmt.Sprintf("eventbus-%s", args.Sensor.Namespace),
},
{
Name: "POD_NAME",
Name: common.EnvVarPodName,
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}},
},
{
Name: common.EnvVarLeaderElection,
Value: args.Sensor.Annotations[common.AnnotationLeaderElection],
},
}

busConfigBytes, err := json.Marshal(eventBus.Status.Config)
Expand Down
23 changes: 23 additions & 0 deletions docs/eventsources/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ old one is gone.
- Redis
- Resource

## Kubernetes Leader Election

By default, Argo Events will use NATS for the HA leader election. Alternatively,
you can opt-in to a Kubernetes native leader election by specifying the following
annotation.
```yaml
annotations:
events.argoproj.io/leader-election: k8s
```
To use Kubernetes leader election the following RBAC rules need to be associated
with the EventSource ServiceAccount.
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: argo-events-leaderelection-role
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create", "update"]
```
## More
Click [here](../dr_ha_recommendations.md) to learn more information about Argo
Expand Down
25 changes: 25 additions & 0 deletions docs/sensors/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,30 @@ elected to be active if the old one is gone.
**Please DO NOT manually scale up the replicas, that might cause unexpected
behaviors!**

## Kubernetes Leader Election

By default, Argo Events will use NATS for the HA leader election. Alternatively,
you can opt-in to a Kubernetes native leader election by specifying the following
annotation.
```yaml
annotations:
events.argoproj.io/leader-election: k8s
```
To use Kubernetes leader election the following RBAC rules need to be associated
with the Sensor ServiceAccount.
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: argo-events-leaderelection-role
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create", "update"]
```
## More
Click [here](../dr_ha_recommendations.md) to learn more information about Argo
Events DR/HA recommendations.
1 change: 1 addition & 0 deletions eventsources/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func Start() {

logger.Infow("starting eventsource server", "version", argoevents.GetVersion())
adaptor := eventsources.NewEventSourceAdaptor(eventSource, busConfig, ebSubject, hostname, m)

if err := adaptor.Start(ctx); err != nil {
logger.Fatalw("failed to start eventsource server", zap.Error(err))
}
Expand Down
Loading

0 comments on commit d0481ae

Please sign in to comment.