Skip to content

Commit

Permalink
feat: more jetstream configurations (#1771)
Browse files Browse the repository at this point in the history
* feat: more jetstream configurations

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Mar 29, 2022
1 parent 99ff0d2 commit eef3723
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 136 deletions.
24 changes: 24 additions & 0 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@
},
"type": "array"
},
"streamConfig": {
"description": "Optional configuration for the streams to be created in this JetStream service, if specified, it will be merged with the default configuration in controller-config. It accepts a YAML format configuration, available fields include, \"maxBytes\", \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m).",
"type": "string"
},
"tolerations": {
"description": "If specified, the pod's tolerations.",
"items": {
Expand All @@ -483,6 +487,9 @@
"auth": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamAuth"
},
"streamConfig": {
"type": "string"
},
"url": {
"description": "JetStream (Nats) URL",
"type": "string"
Expand Down
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ const (
// Default EventBus name
DefaultEventBusName = "default"

// key of server auth secret
JetStreamServerAuthSecretKey = "auth"
// key of auth server secret
JetStreamServerSecretAuthKey = "auth"
// key of encryption server secret
JetStreamServerSecretEncryptionKey = "encryption"
// key of client auth secret
JetStreamClientAuthSecretKey = "client-auth"
// key of nats-js.conf in the configmap
Expand Down
5 changes: 3 additions & 2 deletions controllers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type NatsStreamingVersion struct {
}

type JetStreamConfig struct {
Settings string `json:"settings"`
Versions []JetStreamVersion `json:"versions"`
Settings string `json:"settings"`
StreamConfig string `json:"streamConfig"`
Versions []JetStreamVersion `json:"versions"`
}

type JetStreamVersion struct {
Expand Down
1 change: 1 addition & 0 deletions controllers/eventbus/installer/assets/jetstream/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ server_name: $POD_NAME
# #
###################################
jetstream {
key: $JS_KEY
store_dir: "/data/jetstream/store"
{{.Settings}}
}
Expand Down
44 changes: 33 additions & 11 deletions controllers/eventbus/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"text/template"

"github.com/spf13/viper"
"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -18,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/controllers"
Expand Down Expand Up @@ -58,7 +60,23 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig,
if js := r.eventBus.Spec.JetStream; js == nil {
return nil, fmt.Errorf("invalid jetstream eventbus spec")
}
if err := r.createAuthSecrets(ctx); err != nil {
// merge
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBufferString(r.config.EventBus.JetStream.StreamConfig)); err != nil {
return nil, fmt.Errorf("invalid jetstream config in global configuration, %w", err)
}
if x := r.eventBus.Spec.JetStream.StreamConfig; x != nil {
if err := v.MergeConfig(bytes.NewBufferString(*x)); err != nil {
return nil, fmt.Errorf("failed to merge customized stream config, %w", err)
}
}
b, err := yaml.Marshal(v.AllSettings())
if err != nil {
return nil, fmt.Errorf("failed to marshal merged buffer config, %w", err)
}

if err := r.createSecrets(ctx); err != nil {
r.logger.Errorw("failed to create jetstream auth secrets", zap.Error(err))
r.eventBus.Status.MarkDeployFailed("JetStreamAuthSecretsFailed", err.Error())
return nil, err
Expand Down Expand Up @@ -90,6 +108,7 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig,
Key: common.JetStreamClientAuthSecretKey,
},
},
StreamConfig: string(b),
},
}, nil
}
Expand Down Expand Up @@ -268,11 +287,11 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
{
Secret: &corev1.SecretProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: generateJetStreamServerAuthSecretName(r.eventBus),
Name: generateJetStreamServerSecretName(r.eventBus),
},
Items: []corev1.KeyToPath{
{
Key: common.JetStreamServerAuthSecretKey,
Key: common.JetStreamServerSecretAuthKey,
Path: "auth.conf",
},
},
Expand Down Expand Up @@ -300,6 +319,7 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
{Name: "SERVER_NAME", Value: "$(POD_NAME)"},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
{Name: "CLUSTER_ADVERTISE", Value: "$(POD_NAME)." + generateJetStreamServiceName(r.eventBus) + ".$(POD_NAMESPACE).svc.cluster.local"},
{Name: "JS_KEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: generateJetStreamServerSecretName(r.eventBus)}, Key: common.JetStreamServerSecretEncryptionKey}}},
},
VolumeMounts: []corev1.VolumeMount{
{Name: "config-volume", MountPath: "/etc/nats-config"},
Expand Down Expand Up @@ -418,7 +438,8 @@ func (r *jetStreamInstaller) buildStatefulSetSpec(jsVersion *controllers.JetStre
return spec
}

func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
func (r *jetStreamInstaller) createSecrets(ctx context.Context) error {
encryptionKey := common.RandomString(12)
token := common.RandomString(24)
sysPassword := common.RandomString(24)
authTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/server-auth.conf"))
Expand All @@ -433,18 +454,19 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
return fmt.Errorf("failed to parse nats auth template, error: %w", err)
}

serverAuthObj := &corev1.Secret{
serverObj := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.eventBus.Namespace,
Name: generateJetStreamServerAuthSecretName(r.eventBus),
Name: generateJetStreamServerSecretName(r.eventBus),
Labels: r.labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(r.eventBus.GetObjectMeta(), v1alpha1.SchemaGroupVersionKind),
},
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
common.JetStreamServerAuthSecretKey: authTplOutput.Bytes(),
common.JetStreamServerSecretAuthKey: authTplOutput.Bytes(),
common.JetStreamServerSecretEncryptionKey: []byte(encryptionKey),
},
}

Expand All @@ -466,7 +488,7 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
oldServerObjExisting, oldClientObjExisting := true, true

oldSObj := &corev1.Secret{}
if err := r.client.Get(ctx, client.ObjectKeyFromObject(serverAuthObj), oldSObj); err != nil {
if err := r.client.Get(ctx, client.ObjectKeyFromObject(serverObj), oldSObj); err != nil {
if apierrors.IsNotFound(err) {
oldServerObjExisting = false
} else {
Expand Down Expand Up @@ -501,7 +523,7 @@ func (r *jetStreamInstaller) createAuthSecrets(ctx context.Context) error {
r.logger.Infow("deleted malformed nats client auth secret successfully")
}

if err := r.client.Create(ctx, serverAuthObj); err != nil {
if err := r.client.Create(ctx, serverObj); err != nil {
return fmt.Errorf("failed to create nats server auth secret, err: %w", err)
}
r.logger.Infow("created nats server auth secret successfully")
Expand Down Expand Up @@ -626,8 +648,8 @@ func (r *jetStreamInstaller) getPVCs(ctx context.Context) ([]corev1.PersistentVo
return pvcl.Items, nil
}

func generateJetStreamServerAuthSecretName(eventBus *v1alpha1.EventBus) string {
return fmt.Sprintf("eventbus-%s-js-server-auth", eventBus.Name)
func generateJetStreamServerSecretName(eventBus *v1alpha1.EventBus) string {
return fmt.Sprintf("eventbus-%s-js-server", eventBus.Name)
}

func generateJetStreamClientAuthSecretName(eventBus *v1alpha1.EventBus) string {
Expand Down
20 changes: 14 additions & 6 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestJetStreamBadInstallation(t *testing.T) {
func TestJetStreamGenerateNames(t *testing.T) {
n := generateJetStreamStatefulSetName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js", n)
n = generateJetStreamServerAuthSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-server-auth", n)
n = generateJetStreamServerSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-server", n)
n = generateJetStreamClientAuthSecretName(testJetStreamEventBus)
assert.Equal(t, "eventbus-"+testJetStreamEventBus.Name+"-js-client-auth", n)
n = generateJetStreamConfigMapName(testJetStreamEventBus)
Expand Down Expand Up @@ -89,6 +89,13 @@ func TestJetStreamCreateObjects(t *testing.T) {
assert.Equal(t, testJSReloaderImage, sts.Spec.Template.Spec.Containers[1].Image)
assert.Equal(t, testJetStreamExporterImage, sts.Spec.Template.Spec.Containers[2].Image)
assert.True(t, len(sts.Spec.Template.Spec.Volumes) > 1)
envNames := []string{}
for _, e := range sts.Spec.Template.Spec.Containers[0].Env {
envNames = append(envNames, e.Name)
}
for _, e := range []string{"POD_NAME", "SERVER_NAME", "POD_NAMESPACE", "CLUSTER_ADVERTISE", "JS_KEY"} {
assert.Contains(t, envNames, e)
}
})

t.Run("test create svc", func(t *testing.T) {
Expand All @@ -106,13 +113,14 @@ func TestJetStreamCreateObjects(t *testing.T) {
t.Run("test create auth secrets", func(t *testing.T) {
testObj := testJetStreamEventBus.DeepCopy()
i.eventBus = testObj
err := i.createAuthSecrets(ctx)
err := i.createSecrets(ctx)
assert.NoError(t, err)
s := &corev1.Secret{}
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamServerAuthSecretName(testObj)}, s)
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamServerSecretName(testObj)}, s)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.Data))
assert.Contains(t, s.Data, common.JetStreamServerAuthSecretKey)
assert.Equal(t, 2, len(s.Data))
assert.Contains(t, s.Data, common.JetStreamServerSecretAuthKey)
assert.Contains(t, s.Data, common.JetStreamServerSecretEncryptionKey)
s = &corev1.Secret{}
err = cl.Get(ctx, types.NamespacedName{Namespace: testObj.Namespace, Name: generateJetStreamClientAuthSecretName(testObj)}, s)
assert.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions manifests/base/eventbus-controller/controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
7 changes: 7 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
7 changes: 7 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ data:
max_memory_store: -1
# e.g. 20G. -1 means no limit, Up to 1TB if available
max_file_store: 1TB
streamConfig: |
# The default properties of the streams to be created in this JetStream service
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
replicas: 3
duplicates: 300s
versions:
- version: 2.7.4
natsImage: nats:2.7.4
Expand Down
Loading

0 comments on commit eef3723

Please sign in to comment.