Skip to content

Commit

Permalink
Introduced stop endpoint to stop the running etcd-wrapper.
Browse files Browse the repository at this point in the history
  • Loading branch information
ishan16696 committed Sep 30, 2024
1 parent e2fd9e7 commit a8891a5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
17 changes: 16 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package app

import (
"context"
"net/http"
"syscall"
"time"

Expand All @@ -30,6 +31,7 @@ type Application struct {
waitReadyTimeout time.Duration
logger *zap.Logger
etcdReady bool // should have only one actor that updates it, queryAndUpdateEtcdReadiness()
server *http.Server
}

// NewApplication initializes and returns an application struct
Expand Down Expand Up @@ -75,7 +77,17 @@ func (a *Application) Start() error {
defer a.Close()

// Setup readiness probe
go a.SetupReadinessProbe()
go a.queryAndUpdateEtcdReadiness()

// start HTTP server to serve endpoints
go a.startHTTPServer()
defer func() {
if err := a.stopHTTPServer(); err != nil {
a.logger.Error("unable to stop HTTP server: %v",
zap.Error(err),
)
}
}()

// Create embedded etcd and start.
if err = a.startEtcd(); err != nil {
Expand Down Expand Up @@ -105,6 +117,9 @@ func (a *Application) Close() {
if err := a.etcdClient.Close(); err != nil {
a.logger.Error("failed to close etcd client", zap.Error(err))
}
if a.etcd != nil {
a.etcd.Close()
}
a.cancelContext()
}

Expand Down
71 changes: 48 additions & 23 deletions internal/app/readycheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,14 @@ import (
)

const (
// ReadyServerPort is the port number for the server that serves the readiness probe
ReadyServerPort = int64(9095)
// ServerPort is the port number for the http server of etcd wrapper
ServerPort = int64(9095)
etcdConnectionTimeout = 5 * time.Second
etcdGetTimeout = 5 * time.Second
etcdQueryInterval = 2 * time.Second
etcdEndpointPort = "2379"
)

// SetupReadinessProbe sets up the readiness probe for this application. It is a blocking function and therefore
// the consumer should always call this within a go-routine unless the caller itself wants to block on this which is unlikely.
func (a *Application) SetupReadinessProbe() {
// Start go routine to regularly query etcd to update its readiness status.
go a.queryAndUpdateEtcdReadiness()
// If the http server errors out then you will have to panic and that should cause the container to exit and then be restarted by kubelet.
if a.isTLSEnabled() {
http.Handle("/readyz", http.HandlerFunc(a.readinessHandler))
err := http.ListenAndServeTLS(fmt.Sprintf(":%d", ReadyServerPort), a.cfg.ClientTLSInfo.CertFile, a.cfg.ClientTLSInfo.KeyFile, nil)
if err != nil {
a.logger.Fatal("failed to start TLS readiness endpoint", zap.Error(err))
}
} else {
http.Handle("/readyz", http.HandlerFunc(a.readinessHandler))
err := http.ListenAndServe(fmt.Sprintf(":%d", ReadyServerPort), nil)
if err != nil {
a.logger.Fatal("failed to start readiness endpoint", zap.Error(err))
}
}
}

// queryAndUpdateEtcdReadiness periodically queries the etcd DB to check its readiness and updates the status
// of the query into the etcdStatus struct. It stops querying when the application context is cancelled.
func (a *Application) queryAndUpdateEtcdReadiness() {
Expand Down Expand Up @@ -122,3 +101,49 @@ func (a *Application) isTLSEnabled() bool {
len(strings.TrimSpace(a.cfg.ClientTLSInfo.KeyFile)) != 0 &&
len(strings.TrimSpace(a.cfg.ClientTLSInfo.TrustedCAFile)) != 0
}

func (a *Application) stopEtcdHandler(w http.ResponseWriter, _ *http.Request) {
a.logger.Info("received stop request, stopping etcd-wrapper...")
a.cancelContext()
w.WriteHeader(http.StatusOK)
}

func (a *Application) startHTTPServer() {
a.logger.Info(
"Starting HTTP server at addr",
zap.Int64("Port No: ", ServerPort),
)
a.RegisterHandler()
if !a.isTLSEnabled() {
err := a.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
a.logger.Fatal("Failed to start http server: %v", zap.Error(err))
}
a.logger.Info("HTTP server closed gracefully.")
return
}

a.logger.Info("TLS enabled. Starting HTTPS server.")
err := a.server.ListenAndServeTLS(a.cfg.ClientTLSInfo.CertFile, a.cfg.ClientTLSInfo.KeyFile)
if err != nil && err != http.ErrServerClosed {
a.logger.Fatal("Failed to start http server: %v", zap.Error(err))
}
a.logger.Info("HTTPS server closed gracefully.")
}

func (a *Application) stopHTTPServer() error {
return a.server.Close()
}

// RegisterHandler registers the handler for different requests
func (a *Application) RegisterHandler() {
mux := http.NewServeMux()

mux.HandleFunc("/readyz", a.readinessHandler)
mux.HandleFunc("/stop", a.stopEtcdHandler)

a.server = &http.Server{
Addr: fmt.Sprintf(":%d", ServerPort),
Handler: mux,
}
}

0 comments on commit a8891a5

Please sign in to comment.