Skip to content

Commit

Permalink
Merge pull request #7 from LaurenceJJones/context-groups
Browse files Browse the repository at this point in the history
enhance: Error groups and handle graceful shutdowns
  • Loading branch information
andrasbacsai authored Oct 21, 2024
2 parents f9a8de2 + 4cadc49 commit 29b87c9
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 91 deletions.
128 changes: 65 additions & 63 deletions collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"io"
"log"
Expand All @@ -22,83 +23,84 @@ type ContainerMetrics struct {
MemoryAvailable uint64 `json:"available_memory"`
}

func collector() {
func collector(ctx context.Context) {
fmt.Printf("[%s] Starting metrics recorder with refresh rate of %d seconds and retention period of %d days.\n", time.Now().Format("2006-01-02 15:04:05"), refreshRateSeconds, collectorRetentionPeriodDays)

go func() {
ticker := time.NewTicker(time.Duration(refreshRateSeconds) * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic in collector: %v", r)
}
}()

queryTimeInUnixString := getUnixTimeInMilliUTC()

// CPU usage
overallPercentage, err := cpu.Percent(0, false)
if err != nil {
log.Printf("Error getting CPU percentage: %v", err)
return
}

_, err = db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0]))
if err != nil {
log.Printf("Error inserting CPU usage into database: %v", err)
ticker := time.NewTicker(time.Duration(refreshRateSeconds) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
fmt.Printf("[%s] Stopping metrics recorder.\n", time.Now().Format("2006-01-02 15:04:05"))
return
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic in collector: %v", r)
}
}()

collectContainerMetrics(queryTimeInUnixString)

// Memory usage
memory, err := mem.VirtualMemory()
queryTimeInUnixString := getUnixTimeInMilliUTC()

// CPU usage
overallPercentage, err := cpu.Percent(0, false)
if err != nil {
log.Printf("Error getting CPU percentage: %v", err)
return
}

_, err = db.Exec(`INSERT INTO cpu_usage (time, percent) VALUES (?, ?)`, queryTimeInUnixString, fmt.Sprintf("%.2f", overallPercentage[0]))
if err != nil {
log.Printf("Error inserting CPU usage into database: %v", err)
}

collectContainerMetrics(queryTimeInUnixString)

// Memory usage
memory, err := mem.VirtualMemory()
if err != nil {
log.Printf("Error getting memory usage: %v", err)
return
}

_, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`,
queryTimeInUnixString, memory.Total, memory.Available, memory.Used, math.Round(memory.UsedPercent*100)/100, memory.Free)
if err != nil {
log.Printf("Error inserting memory usage into database: %v", err)
}

// Cleanup old data
totalRowsToKeep := 10
currentTime := time.Now().UTC().UnixMilli()
cutoffTime := currentTime - int64(collectorRetentionPeriodDays*24*60*60*1000)

cleanupTable := func(tableName string) {
var totalRows int
err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&totalRows)
if err != nil {
log.Printf("Error getting memory usage: %v", err)
log.Printf("Error counting rows in %s: %v", tableName, err)
return
}

_, err = db.Exec(`INSERT INTO memory_usage (time, total, available, used, usedPercent, free) VALUES (?, ?, ?, ?, ?, ?)`,
queryTimeInUnixString, memory.Total, memory.Available, memory.Used, math.Round(memory.UsedPercent*100)/100, memory.Free)
if err != nil {
log.Printf("Error inserting memory usage into database: %v", err)
}

// Cleanup old data
totalRowsToKeep := 10
currentTime := time.Now().UTC().UnixMilli()
cutoffTime := currentTime - int64(collectorRetentionPeriodDays*24*60*60*1000)

cleanupTable := func(tableName string) {
var totalRows int
err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&totalRows)
if totalRows > totalRowsToKeep {
_, err = db.Exec(fmt.Sprintf(`DELETE FROM %s WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM %s ORDER BY time DESC LIMIT ?)`, tableName, tableName),
cutoffTime, totalRowsToKeep)
if err != nil {
log.Printf("Error counting rows in %s: %v", tableName, err)
return
}

if totalRows > totalRowsToKeep {
_, err = db.Exec(fmt.Sprintf(`DELETE FROM %s WHERE CAST(time AS BIGINT) < ? AND time NOT IN (SELECT time FROM %s ORDER BY time DESC LIMIT ?)`, tableName, tableName),
cutoffTime, totalRowsToKeep)
if err != nil {
log.Printf("Error deleting old data from %s: %v", tableName, err)
}
log.Printf("Error deleting old data from %s: %v", tableName, err)
}
}
}

cleanupTable("cpu_usage")
cleanupTable("memory_usage")
cleanupTable("container_cpu_usage")
cleanupTable("container_memory_usage")
cleanupTable("cpu_usage")
cleanupTable("memory_usage")
cleanupTable("container_cpu_usage")
cleanupTable("container_memory_usage")

}()
}
}()
}
}()
}
}

func collectContainerMetrics(queryTimeInUnixString string) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/goccy/go-json v0.10.3
github.com/mattn/go-sqlite3 v1.14.24
github.com/shirou/gopsutil v3.21.11+incompatible
golang.org/x/sync v0.8.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
78 changes: 73 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package main
import (
"context"
"database/sql"
"errors"
"log"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/gin-gonic/gin"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/sync/errgroup"
)

var debug bool = false
Expand Down Expand Up @@ -60,6 +64,25 @@ func Token() gin.HandlerFunc {
}
}

func HandleSignals(ctx context.Context) error {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, os.Interrupt)

select {
case s := <-signalChan:
switch s {
case syscall.SIGTERM:
return errors.New("received SIGTERM")
case os.Interrupt: // cross-platform SIGINT
return errors.New("received interrupt")
}
case <-ctx.Done():
return ctx.Err()
}

return nil
}

func main() {

if gin.Mode() == gin.DebugMode {
Expand Down Expand Up @@ -246,12 +269,10 @@ func main() {
setupCpuRoutes(r)
setupContainerRoutes(r)
setupMemoryRoutes(r)
setupPush()
} else {
setupCpuRoutes(r)
setupContainerRoutes(r)
setupMemoryRoutes(r)
setupPush()
}
if debug {
r.GET("/debug/pprof", func(c *gin.Context) {
Expand Down Expand Up @@ -279,13 +300,60 @@ func main() {
pprof.Handler("block").ServeHTTP(c.Writer, c.Request)
})
}

group, gCtx := errgroup.WithContext(context.Background())
group.Go(func() error {
return HandleSignals(gCtx)
})
group.Go(func() error {
setupPush(gCtx)
return nil
})
// Collector
if collectorEnabled {
collector()
group.Go(func() error {
collector(gCtx)
return nil
})
}
cleanup()
r.Run(":8888")
srv := &http.Server{
Addr: ":8888",
Handler: r.Handler(),
}
group.Go(func() error {
errorChan := make(chan error, 1)
go func() {
defer close(errorChan)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
errorChan <- err
}
}()
select {
case <-gCtx.Done():
return nil // context cancelled
case err := <-errorChan:
return err
}
})
if err := group.Wait(); err != nil {
switch err.Error() {
case "received SIGTERM":
log.Println("received SIGTERM shutting down")
case "received interrupt":
log.Println("received interrupt shutting down")
default:
log.Fatal(err) // unexpected error
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal(err) // failure/timeout shutting down the server gracefully
}
select {
case <-ctx.Done():
log.Println("server shutdown")
}
}

func makeDockerRequest(url string) (*http.Response, error) {
Expand Down
35 changes: 12 additions & 23 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package main

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"time"

"github.com/docker/docker/api/types"
Expand All @@ -30,29 +29,19 @@ func setupPushRoute(r *gin.Engine) {
})
}

func setupPush() {
go func() {
ticker := time.NewTicker(time.Duration(pushIntervalSeconds) * time.Second)
defer ticker.Stop()
func setupPush(ctx context.Context) {
ticker := time.NewTicker(time.Duration(pushIntervalSeconds) * time.Second)
defer ticker.Stop()

done := make(chan bool)
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)
<-sigint
done <- true
}()

for {
select {
case <-done:
fmt.Println("Push operation stopped")
return
case <-ticker.C:
getPushData()
}
for {
select {
case <-ctx.Done():
fmt.Println("Push operation stopped")
return
case <-ticker.C:
getPushData()
}
}()
}
}

func getPushData() (map[string]interface{}, error) {
Expand Down

0 comments on commit 29b87c9

Please sign in to comment.