From 25930094de479d3360a0ca7331cd72a9ec6e9a83 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 17 Dec 2024 10:49:08 -0500 Subject: [PATCH] sketch of generic heartbeat service --- pkg/loop/server.go | 54 +++++++++++++++++++++++++++++++++++ pkg/services/heartbeat.go | 59 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 pkg/services/heartbeat.go diff --git a/pkg/loop/server.go b/pkg/loop/server.go index c866be20b..529898314 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -1,16 +1,22 @@ package loop import ( + "context" "fmt" "os" + "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" ) +const HeartbeatSeconds = 1 + // NewStartedServer returns a started Server. // The caller is responsible for calling Server.Stop(). func NewStartedServer(loggerName string) (*Server, error) { @@ -48,6 +54,7 @@ type Server struct { Logger logger.SugaredLogger promServer *PromServer checker *services.HealthChecker + heartbeat *services.Heartbeat } func newServer(loggerName string) (*Server, error) { @@ -62,6 +69,48 @@ func newServer(loggerName string) (*Server, error) { } lggr = logger.Named(lggr, loggerName) s.Logger = logger.Sugared(lggr) + + var gauge metric.Int64Gauge + var count metric.Int64Counter + var cme custmsg.Labeler + + heartbeat := services.NewHeartbeat( + s.Logger, + HeartbeatSeconds*time.Second, + func(ctx context.Context) error { + // Setup beholder resources + gauge, err = beholder.GetMeter().Int64Gauge("heartbeat") + if err != nil { + return err + } + count, err = beholder.GetMeter().Int64Counter("heartbeat_count") + if err != nil { + return err + } + + cme = custmsg.NewLabeler() + return nil + }, + func(engCtx context.Context) { + // TODO allow override of tracer provider into engine for beholder + _, innerSpan := beholder.GetTracer().Start(engCtx, "heartbeat.beat") + defer innerSpan.End() + + gauge.Record(engCtx, 1) + count.Add(engCtx, 1) + + err = cme.Emit(engCtx, "heartbeat") + if err != nil { + // TODO this is the server logger, not the engine logger + s.Logger.Errorw("heartbeat emit failed", "err", err) + } + }, + func() error { + return nil + }, + ) + s.heartbeat = &heartbeat + return s, nil } @@ -132,6 +181,10 @@ func (s *Server) start() error { return fmt.Errorf("error starting health checker: %w", err) } + if err := s.heartbeat.Start(context.TODO()); err != nil { + return fmt.Errorf("error starting heartbeat: %w", err) + } + return nil } @@ -151,4 +204,5 @@ func (s *Server) Stop() { if err := s.Logger.Sync(); err != nil { fmt.Println("Failed to sync logger:", err) } + s.heartbeat.Close() } diff --git a/pkg/services/heartbeat.go b/pkg/services/heartbeat.go new file mode 100644 index 000000000..39f00f848 --- /dev/null +++ b/pkg/services/heartbeat.go @@ -0,0 +1,59 @@ +package services + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" +) + +// Heartbeat is a usage of Engine for application specific heartbeats, +// used in the core node and for loops. It accepts a named logger, +// a beat, a setup func to initialize resources used on each beat, +// a beat function to define the behavior on each beat, and a close func +// for resource teardown +type Heartbeat struct { + Service + eng *Engine + + beat time.Duration + lggr logger.Logger +} + +func NewHeartbeat( + lggr logger.Logger, + beat time.Duration, + setupFn func(ctx context.Context) error, + beatFn func(bCtx context.Context), + closeFn func() error, +) Heartbeat { + h := Heartbeat{ + beat: beat, + lggr: lggr, + } + startFn := func(ctx context.Context) error { + err := setupFn(ctx) + if err != nil { + return fmt.Errorf("setting up heartbeat: %w", err) + } + + // consistent tick period + constantTickFn := func() time.Duration { + return h.beat + } + + // TODO allow for override of tracer provider in engine + // TODO wrap beatFn in engine trace + h.eng.GoTick(timeutil.NewTicker(constantTickFn), beatFn) + return nil + } + + h.Service, h.eng = Config{ + Name: fmt.Sprintf("%s.%s", lggr.Name(), "heartbeat"), + Start: startFn, + Close: closeFn, + }.NewServiceEngine(lggr) + return h +}