Skip to content

Commit

Permalink
Kafka Deployment Events (#93)
Browse files Browse the repository at this point in the history
This PR solves issue #36 and Naiserator is now able to publish Protobuf messages to Kafka once when a deployment is initialised and also when a deployment has succeed.

I've also added tests, a makefile function and helpful docs in the README. If one wants to test it out for themselves.

What this feature implements is a kafka package that handles the kafka client , config, producer and event channel.
A deployment package which is just the package generated from the event.proto file.
And a generators package which generates these events for us.

We've put most of the functionality into the synchronize loop, where it sends one event on initialization and monitors the app before sending a subsequent event on successful deployment.

I and @ambientsound have been working on this issue over slack and in person, so there's no discussion here on Github. But the solution is also pretty much done as far as I can tell. So it's basically ready to be merged.

Feedback is still very welcome!

CHANGELOG:
* [Feature] Added basic kafka and protobuf integrations
This is by no means complete, but it's a start.

I've implemented a skeleton for the kafka configuration.
Added Makefiles and helpful developer docs in the README.
I've also started on the kafka package which shows the design
i'm thinking of going with, which is a producerLoop, listening on a
channel, and in some way receiving the protobuf Event struct on that
channel. The way it recieves the struct I've still not decided on.

* [Feature] added deployment struct, event contexts, and app rollout etc.
This is just a basic first draft of how I think it should look, nothing
final. And I'm not super happy with the design, but can't find anything
better currently.

* [Chore] Removed event.proto and added gitignore

* [Refactor] Moved application Rollout check to naiserator package, etc.

Now the logic that checks if an app is deployed is moved to the
naiserator package, which is a much better fit. Now it's a continuous
check if the deployment has all of it's pods ready and when that
condition is met, a bool is sent over channel to signify that the
application is ready.

This is then again used by a function in the kafka package which waits
for 5 minutes for a bool to be sent over that channel before it times
out. If the bool is sent an event is then sent over to kafka.

Other than that I did som small changes to logging. But there's more
work to be done here.

* [Chore] Logging and error handling changes

* [Feature] Added basic kafka and protobuf integrations
This is by no means complete, but it's a start.

I've implemented a skeleton for the kafka configuration.
Added Makefiles and helpful developer docs in the README.
I've also started on the kafka package which shows the design
i'm thinking of going with, which is a producerLoop, listening on a
channel, and in some way receiving the protobuf Event struct on that
channel. The way it recieves the struct I've still not decided on.

* [Feature] added deployment struct, event contexts, and app rollout etc.
This is just a basic first draft of how I think it should look, nothing
final. And I'm not super happy with the design, but can't find anything
better currently.

* [Chore] Removed event.proto and added gitignore

* [Refactor] Moved application Rollout check to naiserator package, etc.

Now the logic that checks if an app is deployed is moved to the
naiserator package, which is a much better fit. Now it's a continuous
check if the deployment has all of it's pods ready and when that
condition is met, a bool is sent over channel to signify that the
application is ready.

This is then again used by a function in the kafka package which waits
for 5 minutes for a bool to be sent over that channel before it times
out. If the bool is sent an event is then sent over to kafka.

Other than that I did som small changes to logging. But there's more
work to be done here.

* move code to deployment event generation module

* refactor kafka event logic

* [Feature] Added function to get the hash of the container iamge
We need the hash of the container image for the kafka depoyment events,
so I've implemented a simple function that gets all the pods that has an
app label matching the app name. Then it get's the container statuses of
those pods and extracts the ContainerID, in which the last 64 chars are
always the sha256 hash.

* [Feature] Deployment CorrelationID saved in kubernetes and equal on both deployment events

* [Bug fix] added return to MonitorRollout, so the goroutine is closed and a successful kafka message is only sent one time

* [Refactor] Moved UUID generation into the syncronize loop

Now the uuid generation can fail and the sending of events would stop.

I also changed the NewDeploymentEvent function to use the app annotation
`nais.io/deployment-correlation-id` as it's correlationID. This way it's
set at the start and subsequent events all have that correlationID
automatically, as it's stored in the app.

* propagate correlation ID everywhere

* add docker container reference parser

* [Chore] Removed unused ContainerImageHash function

* use gettag

* write first test for deployment event

* get clusterName from app and order event fields

* expose NAIS cluster name in Application resource

* [Tests] 100% Test coverage for generator

* [Chore] Added check if kafka is enabled

* [Chore] Cleanup config options and functions

* Revert "[Chore] Cleanup config options and functions"

This reverts commit 4cb9d2a.

* [Chore] Removed SignatureKey Option

* Rename default topic from deploymentStatus to deploymentEvents

* clarify readme

* kafka logging includes application logger context

* prevent nil crash when setting correlation id

* include correlation id and application name in log messages
  • Loading branch information
Kasper Furnes Bernhoff authored Jul 17, 2019
1 parent 7fdb043 commit 8220f38
Show file tree
Hide file tree
Showing 18 changed files with 1,049 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ cmd/naiserator/naiserator
vendor/
cover.out
./naiserator
*.proto
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ TAG := navikt/${NAME}
LATEST := ${TAG}:latest
ROOT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))

PROTOC = $(shell which protoc)
PROTOC_GEN_GO = $(shell which protoc-gen-go)

.PHONY: build docker docker-push local install test codegen-crd codegen-updater

.PHONY: build docker docker-push local install test codegen-crd codegen-updater proto

build:
cd cmd/naiserator && go build
Expand All @@ -30,3 +33,9 @@ codegen-crd:

codegen-updater:
go generate ${ROOT_DIR}/hack/generator/updater.go | goimports > ${ROOT_DIR}/updater/zz_generated.go

proto:
wget https://raw.githubusercontent.com/navikt/protos/master/deployment/event.proto
$(PROTOC) --plugin=$(PROTOC_GEN_GO) --go_out=. event.proto
mv event.pb.go pkg/event/
rm -f event.proto
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ kubectl apply -f examples/nais.yaml
make local
```

### Kafka & Protobuf

Whenever an Application is synchronized, a [deployment event message](https://github.com/navikt/protos/blob/master/deployment/deployment.proto)
can be sent to a Kafka topic. There's a few prerequisites to develop with this enabled locally:

1. [Protobuf installed](https://github.com/golang/protobuf)
2. An instance of kafka to test against. Use `docker-compose up` to bring up a local instance.
3. Enable this feature by passing `--kafka-enabled=true` when starting Naiserator.
4. Whenever the Protobuf schema is updated, and you need the new features, update using `make proto`. It will download the definitions, compile and place them in the correct packages.

### Code generation

In order to use the Kubernetes Go library, we need to use classes that work together with the interfaces in that library.
Expand Down
17 changes: 16 additions & 1 deletion cmd/naiserator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
clientV1Alpha1 "github.com/nais/naiserator/pkg/client/clientset/versioned"
clientset "github.com/nais/naiserator/pkg/client/clientset/versioned"
informers "github.com/nais/naiserator/pkg/client/informers/externalversions"
"github.com/nais/naiserator/pkg/kafka"
"github.com/nais/naiserator/pkg/metrics"
"github.com/nais/naiserator/pkg/resourcecreator"
log "github.com/sirupsen/logrus"
Expand All @@ -28,22 +29,35 @@ var (
bindAddr string
accessPolicyEnabled bool
nativeSecretsEnabled bool

kafkaConfig = kafka.DefaultConfig()
)

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to Kubernetes config file")
flag.StringVar(&bindAddr, "bind-address", ":8080", "ip:port where http requests are served")
flag.BoolVar(&accessPolicyEnabled, "access-policy-enabled", ensureBool(getEnv("ACCESS_POLICY_ENABLED", "false")), "enable access policy with Istio and NetworkPolicies")
flag.BoolVar(&nativeSecretsEnabled, "native-secrets-enabled", ensureBool(getEnv("NATIVE_SECRETS_ENABLED", "false")), "enable use of native secrets")

kafka.SetupFlags(&kafkaConfig)
flag.Parse()
}

func main() {
log.SetFormatter(&log.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
})

log.Info("Naiserator starting up")

if kafkaConfig.Enabled {
kafkaClient, err := kafka.NewClient(&kafkaConfig)
if err != nil {
log.Fatalf("unable to setup kafka: %s", err)
}
go kafkaClient.ProducerLoop()
}

// register custom types
err := v1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
Expand Down Expand Up @@ -74,7 +88,8 @@ func main() {
createGenericClientset(kubeconfig),
createApplicationClientset(kubeconfig),
applicationInformerFactory.Naiserator().V1alpha1().Applications(),
resourceOptions)
resourceOptions,
kafkaConfig.Enabled)

applicationInformerFactory.Start(stopCh)
n.Run(stopCh)
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: "3"
services:
kafka:
image: "spotify/kafka"
ports:
- "9092:9092"
environment:
ADVERTISED_HOST: localhost
ADVERTISED_PORT: 9092
13 changes: 8 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ module github.com/nais/naiserator

require (
cloud.google.com/go v0.39.0 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/Shopify/sarama v1.23.0
github.com/gogo/protobuf v1.2.1
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/gofuzz v1.0.0 // indirect
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
Expand All @@ -14,25 +16,26 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/hashstructure v1.0.0
github.com/novln/docker-parser v0.0.0-20190306203532-b3f122c6978e
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pelletier/go-toml v1.3.0 // indirect
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/prometheus/procfs v0.0.0-20190403104016-ea9eea638872 // indirect
github.com/shurcooL/githubv4 v0.0.0-20190625031733-ee671ab25ff0 // indirect
github.com/shurcooL/graphql v0.0.0-20181231061246-d48a9a75455f // indirect
github.com/sirupsen/logrus v1.4.1
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.3.2
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 // indirect
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a // indirect
golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
golang.org/x/tools v0.0.0-20190618233249-04b924abaa25 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190408172450-b1350b9e3bc2 // kubernetes-1.12.7
k8s.io/apimachinery v0.0.0-20190409012359-28713bcd7119 // kubernetes-1.12.7
Expand Down
69 changes: 68 additions & 1 deletion naiserator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ package naiserator

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/nais/naiserator/pkg/apis/nais.io/v1alpha1"
clientV1Alpha1 "github.com/nais/naiserator/pkg/client/clientset/versioned"
informers "github.com/nais/naiserator/pkg/client/informers/externalversions/nais.io/v1alpha1"
"github.com/nais/naiserator/pkg/event"
"github.com/nais/naiserator/pkg/event/generator"
"github.com/nais/naiserator/pkg/kafka"
"github.com/nais/naiserator/pkg/metrics"
"github.com/nais/naiserator/pkg/resourcecreator"
"github.com/nais/naiserator/updater"
log "github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,21 +25,28 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
DeploymentMonitorFrequency = time.Second * 5
DeploymentMonitorTimeout = time.Minute * 5
)

// Naiserator is a singleton that holds Kubernetes client instances.
type Naiserator struct {
ClientSet kubernetes.Interface
KafkaEnabled bool
AppClient *clientV1Alpha1.Clientset
ApplicationInformer informers.ApplicationInformer
ApplicationInformerSynced cache.InformerSynced
ResourceOptions resourcecreator.ResourceOptions
}

func NewNaiserator(clientSet kubernetes.Interface, appClient *clientV1Alpha1.Clientset, applicationInformer informers.ApplicationInformer, resourceOptions resourcecreator.ResourceOptions) *Naiserator {
func NewNaiserator(clientSet kubernetes.Interface, appClient *clientV1Alpha1.Clientset, applicationInformer informers.ApplicationInformer, resourceOptions resourcecreator.ResourceOptions, kafkaEnabled bool) *Naiserator {
naiserator := Naiserator{
ClientSet: clientSet,
AppClient: appClient,
ApplicationInformer: applicationInformer,
ApplicationInformerSynced: applicationInformer.Informer().HasSynced,
KafkaEnabled: kafkaEnabled,
ResourceOptions: resourceOptions}

applicationInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -78,6 +91,13 @@ func (n *Naiserator) synchronize(logger *log.Entry, app *v1alpha1.Application) e
return nil
}

deploymentID, err := uuid.NewRandom()
if err != nil {
return fmt.Errorf("while generating a deployment UUID: %s", err)
}
app.SetCorrelationID(deploymentID.String())
logger = logger.WithField("correlation-id", deploymentID.String())

if err := n.removeOrphanIngresses(logger, app); err != nil {
return fmt.Errorf("while removing old resources: %s", err)
}
Expand Down Expand Up @@ -111,6 +131,15 @@ func (n *Naiserator) synchronize(logger *log.Entry, app *v1alpha1.Application) e
metrics.ResourcesGenerated.Add(float64(len(resources)))
metrics.Deployments.Inc()

if n.KafkaEnabled {
// Broadcast a message on Kafka that the deployment is initialized.
e := generator.NewDeploymentEvent(*app)
kafka.Events <- kafka.Message{Event: e, Logger: *logger}

// Monitor its completion timeline over a designated period
go n.MonitorRollout(*app, *logger, DeploymentMonitorFrequency, DeploymentMonitorTimeout)
}

app.SetLastSyncedHash(hash)
logger.Infof("%s: setting new hash %s", app.Name, hash)

Expand Down Expand Up @@ -138,6 +167,7 @@ func (n *Naiserator) update(old, new interface{}) {
"namespace": app.Namespace,
"apiversion": app.APIVersion,
"resourceversion": app.ResourceVersion,
"application": app.Name,
})

metrics.ApplicationsProcessed.Inc()
Expand Down Expand Up @@ -185,6 +215,43 @@ func (n *Naiserator) removeOrphanIngresses(logger *log.Entry, app *v1alpha1.Appl
return err
}

func (n *Naiserator) MonitorRollout(app v1alpha1.Application, logger log.Entry, frequency, timeout time.Duration) {
for {
select {
case <-time.After(frequency):
deploy, err := n.ClientSet.AppsV1().Deployments(app.Namespace).Get(app.Name, v1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
log.Errorf("%s: While trying to get Deployment for app: %s", app.Name, err)
}
continue
}

if deploymentComplete(deploy, &deploy.Status) {
event := generator.NewDeploymentEvent(app)
event.RolloutStatus = deployment.RolloutStatus_complete
kafka.Events <- kafka.Message{Event: event, Logger: logger}
return
}

case <-time.After(timeout):
return
}
}
}

// deploymentComplete considers a deployment to be complete once all of its desired replicas
// are updated and available, and no old pods are running.
//
// Copied verbatim from
// https://github.com/kubernetes/kubernetes/blob/74bcefc8b2bf88a2f5816336999b524cc48cf6c0/pkg/controller/deployment/util/deployment_util.go#L745
func deploymentComplete(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool {
return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) &&
newStatus.Replicas == *(deployment.Spec.Replicas) &&
newStatus.AvailableReplicas == *(deployment.Spec.Replicas) &&
newStatus.ObservedGeneration >= deployment.Generation
}

func (n *Naiserator) Run(stop <-chan struct{}) {
log.Info("Starting application synchronization")
if !cache.WaitForCacheSync(stop, n.ApplicationInformerSynced) {
Expand Down
30 changes: 19 additions & 11 deletions pkg/apis/nais.io/v1alpha1/application_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package v1alpha1

import (
"fmt"
"os"
"strconv"

hash "github.com/mitchellh/hashstructure"
Expand All @@ -14,10 +15,12 @@ import (

const (
LastSyncedHashAnnotation = "nais.io/lastSyncedHash"
CorrelationIDAnnotation = "nais.io/deploymentCorrelationID"
SecretTypeEnv = "env"
SecretTypeFiles = "files"
DefaultSecretType = SecretTypeEnv
DefaultSecretMountPath = "/var/run/secrets"
NaisClusterNameEnv = "NAIS_CLUSTER_NAME"
)

// Application defines a NAIS application.
Expand Down Expand Up @@ -203,6 +206,9 @@ func (in *Application) GetOwnerReference() metav1.OwnerReference {
// This is done in order to workaround the k8s client serializer
// which crashes when these fields are uninitialized.
func (in *Application) NilFix() {
if in.Annotations == nil {
in.Annotations = make(map[string]string)
}
if in.Spec.Ingresses == nil {
in.Spec.Ingresses = make([]string, 0)
}
Expand Down Expand Up @@ -246,21 +252,23 @@ func (in Application) Hash() (string, error) {
return strconv.FormatUint(h, 10), err
}

func (in Application) Cluster() string {
return os.Getenv(NaisClusterNameEnv)
}

func (in *Application) LastSyncedHash() string {
a := in.GetAnnotations()
if a == nil {
a = make(map[string]string)
}
return a[LastSyncedHashAnnotation]
in.NilFix()
return in.Annotations[LastSyncedHashAnnotation]
}

func (in *Application) SetLastSyncedHash(hash string) {
a := in.GetAnnotations()
if a == nil {
a = make(map[string]string)
}
a[LastSyncedHashAnnotation] = hash
in.SetAnnotations(a)
in.NilFix()
in.Annotations[LastSyncedHashAnnotation] = hash
}

func (in *Application) SetCorrelationID(id string) {
in.NilFix()
in.Annotations[CorrelationIDAnnotation] = id
}

func (in *Application) DefaultSecretPath(base string) SecretPath {
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/nais.io/v1alpha1/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ func (in *Application) CreateObjectMeta() metav1.ObjectMeta {
"app": in.Name,
"team": in.Labels["team"],
},
Annotations: map[string]string{},
Annotations: map[string]string{
CorrelationIDAnnotation: in.Annotations[CorrelationIDAnnotation],
},
OwnerReferences: in.OwnerReferences(in),
}
}
Expand Down
Loading

0 comments on commit 8220f38

Please sign in to comment.