Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more jetstream configurations #1771

Merged
merged 8 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@
},
"type": "array"
},
"streamConfig": {
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m).",
"type": "string"
},
"tolerations": {
"description": "If specified, the pod's tolerations.",
"items": {
Expand All @@ -483,6 +487,9 @@
"auth": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamAuth"
},
"streamConfig": {
"type": "string"
},
"url": {
"description": "JetStream (Nats) URL",
"type": "string"
Expand Down
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ const (
// Default EventBus name
DefaultEventBusName = "default"

// key of server auth secret
JetStreamServerAuthSecretKey = "auth"
// key of auth server secret
JetStreamServerSecretAuthKey = "auth"
// key of encryption server secret
JetStreamServerSecretEncryptionKey = "encryption"
// key of client auth secret
JetStreamClientAuthSecretKey = "client-auth"
// key of nats-js.conf in the configmap
Expand Down
5 changes: 3 additions & 2 deletions controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type NatsStreamingVersion struct {
}

type JetStreamConfig struct {
Settings string `json:"settings"`
Versions []JetStreamVersion `json:"versions"`
Settings string `json:"settings"`
StreamConfig string `json:"streamConfig"`
Versions []JetStreamVersion `json:"versions"`
}

type JetStreamVersion struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ server_name: $POD_NAME
# #
###################################
jetstream {
key: $JS_KEY
store_dir: "/data/jetstream/store"
{{.Settings}}
}
Expand Down
44 changes: 33 additions & 11 deletions controllers/eventbus/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"text/template"

"github.com/spf13/viper"
"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -18,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/controllers"
Expand Down Expand Up @@ -58,7 +60,23 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig,
if js := r.eventBus.Spec.JetStream; js == nil {
return nil, fmt.Errorf("invalid jetstream eventbus spec")
}
if err := r.createAuthSecrets(ctx); err != nil {
// merge
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBufferString(r.config.EventBus.JetStream.StreamConfig)); err != nil {
return nil, fmt.Errorf("invalid jetstream config in global configuration, %w", err)
}
if x := r.eventBus.Spec.JetStream.StreamConfig; x != nil {
if err := v.MergeConfig(bytes.NewBufferString(*x)); err != nil {
return nil, fmt.Errorf("failed to merge customized stream config, %w", err)
}
}
b, err := yaml.Marshal(v.AllSettings())
if err != nil {
return nil, fmt.Errorf("failed to marshal merged buffer config, %w", err)
}

if err := r.createSecrets(ctx); err != nil {
r.logger.Errorw("failed to create jetstream auth secrets", zap.Error(err))
r.eventBus.Status.MarkDeployFailed("JetStreamAuthSecretsFailed", err.Error())
return nil, err
Expand Down Expand Up @@ -90,6 +108,7 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig,
Key: common.JetStreamClientAuthSecretKey,
},
},
StreamConfig: string(b),
},
}, nil
}
Expand Down Expand Up @@ -268,11 +287,11 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
{
Secret: &corev1.SecretProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: generateJetStreamServerAuthSecretName(r.eventBus),
Name: generateJetStreamServerSecretName(r.eventBus),
},
Items: []corev1.KeyToPath{
{
Key: common.JetStreamServerAuthSecretKey,
Key: common.JetStreamServerSecretAuthKey,
Path: "auth.conf",
},
},
Expand Down Expand Up @@ -300,6 +319,7 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
{Name: "SERVER_NAME", Value: "$(POD_NAME)"},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
{Name: "CLUSTER_ADVERTISE", Value: "$(POD_NAME)." + generateJetStreamServiceName(r.eventBus) + ".$(POD_NAMESPACE).svc.cluster.local"},
{Name: "JS_KEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: generateJetStreamServerSecretName(r.eventBus)}, Key: common.JetStreamServerSecretEncryptionKey}}},
},
VolumeMounts: []corev1.VolumeMount{
{Name: "config-volume", MountPath: "/etc/nats-config"},
Expand Down Expand Up @@ -418,7 +438,8 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
return spec
}

func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
func (r *jetStreamInstaller) createSecrets(ctx context.Context) error {
encryptionKey := common.RandomString(12)
token := common.RandomString(24)
sysPassword := common.RandomString(24)
authTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/server-auth.conf"))
Expand All @@ -433,18 +454,19 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
return fmt.Errorf("failed to parse nats auth template, error: %w", err)
}

serverAuthObj := &corev1.Secret{
serverObj := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.eventBus.Namespace,
Name: generateJetStreamServerAuthSecretName(r.eventBus),
Name: generateJetStreamServerSecretName(r.eventBus),
Labels: r.labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(r.eventBus.GetObjectMeta(), v1alpha1.SchemaGroupVersionKind),
},
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
common.JetStreamServerAuthSecretKey: authTplOutput.Bytes(),
common.JetStreamServerSecretAuthKey: authTplOutput.Bytes(),
common.JetStreamServerSecretEncryptionKey: []byte(encryptionKey),
},
}

Expand All @@ -466,7 +488,7 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
oldServerObjExisting, oldClientObjExisting := true, true

oldSObj := &corev1.Secret{}
if err := r.client.Get(ctx, client.ObjectKeyFromObject(serverAuthObj), oldSObj); err != nil {
if err := r.client.Get(ctx, client.ObjectKeyFromObject(serverObj), oldSObj); err != nil {
if apierrors.IsNotFound(err) {
oldServerObjExisting = false
} else {
Expand Down Expand Up @@ -501,7 +523,7 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
r.logger.Infow("deleted malformed nats client auth secret successfully")
}

if err := r.client.Create(ctx, serverAuthObj); err != nil {
if err := r.client.Create(ctx, serverObj); err != nil {
return fmt.Errorf("failed to create nats server auth secret, err: %w", err)
}
r.logger.Infow("created nats server auth secret successfully")
Expand Down Expand Up @@ -626,8 +648,8 @@ func (r *jetStreamInstaller) getPVCs(ctx context.Context) ([]corev1.PersistentVo
return pvcl.Items, nil
}

func generateJetStreamServerAuthSecretName(eventBus *v1alpha1.EventBus) string {
return fmt.Sprintf("eventbus-%s-js-server-auth", eventBus.Name)
func generateJetStreamServerSecretName(eventBus *v1alpha1.EventBus) string {
return fmt.Sprintf("eventbus-%s-js-server", eventBus.Name)
}

func generateJetStreamClientAuthSecretName(eventBus *v1alpha1.EventBus) string {
Expand Down
20 changes: 14 additions & 6 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestJetStreamBadInstallation(t *testing.T) {
func TestJetStreamGenerateNames(t *testing.T) {
n := generateJetStreamStatefulSetName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js", n)
n = generateJetStreamServerAuthSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-server-auth", n)
n = generateJetStreamServerSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-server", n)
n = generateJetStreamClientAuthSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-client-auth", n)
n = generateJetStreamConfigMapName(testJetStreamEventBus)
Expand Down Expand Up @@ -89,6 +89,13 @@ func TestJetStreamCreateObjects(t *testing.T) {
assert.Equal(t, testJSReloaderImage, sts.Spec.Template.Spec.Containers[1].Image)
assert.Equal(t, testJetStreamExporterImage, sts.Spec.Template.Spec.Containers[2].Image)
assert.True(t, len(sts.Spec.Template.Spec.Volumes) > 1)
envNames := []string{}
for _, e := range sts.Spec.Template.Spec.Containers[0].Env {
envNames = append(envNames, e.Name)
}
for _, e := range []string{"POD_NAME", "SERVER_NAME", "POD_NAMESPACE", "CLUSTER_ADVERTISE", "JS_KEY"} {
assert.Contains(t, envNames, e)
}
})

t.Run("test create svc", func(t *testing.T) {
Expand All @@ -106,13 +113,14 @@ func TestJetStreamCreateObjects(t *testing.T) {
t.Run("test create auth secrets", func(t *testing.T) {
testObj := testJetStreamEventBus.DeepCopy()
i.eventBus = testObj
err := i.createAuthSecrets(ctx)
err := i.createSecrets(ctx)
assert.NoError(t, err)
s := &corev1.Secret{}
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamServerAuthSecretName(testObj)}, s)
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamServerSecretName(testObj)}, s)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.Data))
assert.Contains(t, s.Data, common.JetStreamServerAuthSecretKey)
assert.Equal(t, 2, len(s.Data))
assert.Contains(t, s.Data, common.JetStreamServerSecretAuthKey)
assert.Contains(t, s.Data, common.JetStreamServerSecretEncryptionKey)
s = &corev1.Secret{}
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamClientAuthSecretName(testObj)}, s)
assert.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions manifests/base/eventbus-controller/controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
7 changes: 7 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
7 changes: 7 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
Loading