From 831e2d0af71c30ddaad0ac554d787c3d882a2050 Mon Sep 17 00:00:00 2001 From: Ishan Tyagi Date: Mon, 7 Oct 2024 09:37:40 +0530 Subject: [PATCH] Introduced stop endpoint to stop the running etcd-wrapper. --- internal/app/app.go | 17 ++++++++- internal/app/readycheck.go | 74 ++++++++++++++++++++++++++------------ 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 253cec09..d17ecce8 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -6,6 +6,7 @@ package app import ( "context" + "net/http" "syscall" "time" @@ -30,6 +31,7 @@ type Application struct { waitReadyTimeout time.Duration logger *zap.Logger etcdReady bool // should have only one actor that updates it, queryAndUpdateEtcdReadiness() + server *http.Server } // NewApplication initializes and returns an application struct @@ -75,7 +77,17 @@ func (a *Application) Start() error { defer a.Close() // Setup readiness probe - go a.SetupReadinessProbe() + go a.queryAndUpdateEtcdReadiness() + + // start HTTP server to serve endpoints + go a.startHTTPServer() + defer func() { + if err := a.stopHTTPServer(); err != nil { + a.logger.Error("unable to stop HTTP server: %v", + zap.Error(err), + ) + } + }() // Create embedded etcd and start. if err = a.startEtcd(); err != nil { @@ -105,6 +117,9 @@ func (a *Application) Close() { if err := a.etcdClient.Close(); err != nil { a.logger.Error("failed to close etcd client", zap.Error(err)) } + if a.etcd != nil { + a.etcd.Close() + } a.cancelContext() } diff --git a/internal/app/readycheck.go b/internal/app/readycheck.go index 2e0279a4..efea11c6 100644 --- a/internal/app/readycheck.go +++ b/internal/app/readycheck.go @@ -20,35 +20,14 @@ import ( ) const ( - // ReadyServerPort is the port number for the server that serves the readiness probe - ReadyServerPort = int64(9095) + // ServerPort is the port number for the http server of etcd wrapper + ServerPort = int64(9095) etcdConnectionTimeout = 5 * time.Second etcdGetTimeout = 5 * time.Second etcdQueryInterval = 2 * time.Second etcdEndpointPort = "2379" ) -// SetupReadinessProbe sets up the readiness probe for this application. It is a blocking function and therefore -// the consumer should always call this within a go-routine unless the caller itself wants to block on this which is unlikely. -func (a *Application) SetupReadinessProbe() { - // Start go routine to regularly query etcd to update its readiness status. - go a.queryAndUpdateEtcdReadiness() - // If the http server errors out then you will have to panic and that should cause the container to exit and then be restarted by kubelet. - if a.isTLSEnabled() { - http.Handle("/readyz", http.HandlerFunc(a.readinessHandler)) - err := http.ListenAndServeTLS(fmt.Sprintf(":%d", ReadyServerPort), a.cfg.ClientTLSInfo.CertFile, a.cfg.ClientTLSInfo.KeyFile, nil) - if err != nil { - a.logger.Fatal("failed to start TLS readiness endpoint", zap.Error(err)) - } - } else { - http.Handle("/readyz", http.HandlerFunc(a.readinessHandler)) - err := http.ListenAndServe(fmt.Sprintf(":%d", ReadyServerPort), nil) - if err != nil { - a.logger.Fatal("failed to start readiness endpoint", zap.Error(err)) - } - } -} - // queryAndUpdateEtcdReadiness periodically queries the etcd DB to check its readiness and updates the status // of the query into the etcdStatus struct. It stops querying when the application context is cancelled. func (a *Application) queryAndUpdateEtcdReadiness() { @@ -122,3 +101,52 @@ func (a *Application) isTLSEnabled() bool { len(strings.TrimSpace(a.cfg.ClientTLSInfo.KeyFile)) != 0 && len(strings.TrimSpace(a.cfg.ClientTLSInfo.TrustedCAFile)) != 0 } + +func (a *Application) stopEtcdHandler(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + return + } + a.logger.Info("received stop request, stopping etcd-wrapper...") + a.cancelContext() + w.WriteHeader(http.StatusOK) +} + +func (a *Application) startHTTPServer() { + a.logger.Info( + "Starting HTTP server at addr", + zap.Int64("Port No: ", ServerPort), + ) + a.RegisterHandler() + if !a.isTLSEnabled() { + err := a.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + a.logger.Fatal("Failed to start http server: %v", zap.Error(err)) + } + a.logger.Info("HTTP server closed gracefully.") + return + } + + a.logger.Info("TLS enabled. Starting HTTPS server.") + err := a.server.ListenAndServeTLS(a.cfg.ClientTLSInfo.CertFile, a.cfg.ClientTLSInfo.KeyFile) + if err != nil && err != http.ErrServerClosed { + a.logger.Fatal("Failed to start http server: %v", zap.Error(err)) + } + a.logger.Info("HTTPS server closed gracefully.") +} + +func (a *Application) stopHTTPServer() error { + return a.server.Close() +} + +// RegisterHandler registers the handler for different requests +func (a *Application) RegisterHandler() { + mux := http.NewServeMux() + + mux.HandleFunc("/readyz", a.readinessHandler) + mux.HandleFunc("/stop", a.stopEtcdHandler) + + a.server = &http.Server{ + Addr: fmt.Sprintf(":%d", ServerPort), + Handler: mux, + } +}