Skip to content

Commit

Permalink
Remove otel, use a basic journal:
Browse files Browse the repository at this point in the history
Otel was working well, but it was heavy and
required a otel specific log collector.
The journal is a lot lighter weight. Journal
entries are collected during a reconcile and
logged once at the end. As this produces a good
sized log line per reconcile, debug logging was added
so that this is opt in only.

Signed-off-by: Jacob Weinstock <[email protected]>
  • Loading branch information
jacobweinstock committed Oct 11, 2024
1 parent 11b1878 commit 68ff0e4
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 126 deletions.
2 changes: 0 additions & 2 deletions api/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ type WorkflowStatus struct {
// +patchStrategy=merge
// +listType=atomic
Conditions []WorkflowCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`

TraceParent string `json:"traceParent,omitempty"`
}

// JobStatus holds the state of a specific job.bmc.tinkerbell.org object created.
Expand Down
67 changes: 22 additions & 45 deletions cmd/tink-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/tinkerbell/tink/internal/deprecated/controller"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -31,6 +28,7 @@ type Config struct {
MetricsAddr string
ProbeAddr string
EnableLeaderElection bool
LogLevel int
}

func (c *Config) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -44,6 +42,7 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&c.EnableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
fs.IntVar(&c.LogLevel, "log-level", 0, "Log level (0: info, 1: debug)")
}

func main() {
Expand All @@ -56,57 +55,35 @@ func main() {
func NewRootCommand() *cobra.Command {
var config Config

zlog, err := zap.NewProduction()
if err != nil {
panic(err)
}
logger2 := zapr.NewLogger(zlog).WithName("github.com/tinkerbell/tink")

cmd := &cobra.Command{
Use: "tink-controller",
PreRunE: func(cmd *cobra.Command, _ []string) error {
viper, err := createViper(logger2)
zlog, err := zap.NewProduction()
if err != nil {
panic(err)
}
logger := zapr.NewLogger(zlog).WithName("github.com/tinkerbell/tink")
viper, err := createViper(logger)
if err != nil {
return fmt.Errorf("config init: %w", err)
}
return applyViper(viper, cmd)
},
RunE: func(cmd *cobra.Command, _ []string) error {
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://192.168.2.50:4318")
os.Setenv("OTEL_EXPORTER_OTLP_INSECURE", "true")
os.Setenv("OTEL_SERVICE_NAME", "tink-controller")
ctx := cmd.Context()

oCfg := OConfig{
Servicename: "tink-controller",
Endpoint: "192.168.2.50:4317",
Insecure: true,
zc := zap.NewProductionConfig()
switch config.LogLevel {
case 1:
zc.Level = zap.NewAtomicLevelAt(zapcore.Level(-1))
default:
zc.Level = zap.NewAtomicLevelAt(zapcore.Level(0))
}
ctx, _, _ = Init(ctx, oCfg)
// Create the OTLP log exporter that sends logs to configured destination
logExporter, err := otlploghttp.New(ctx)
zlog, err := zc.Build()
if err != nil {
panic("failed to initialize exporter")
panic(err)
}

// Create the logger provider
lp := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
)

// Ensure the logger is shutdown before exiting so all pending logs are exported
defer lp.Shutdown(ctx)
handler := NewHandler("github.com/tinkerbell/tink", WithLoggerProvider(lp))

logger := logr.FromSlogHandler(handler)
tracer := otel.Tracer("my-tracer")
var span trace.Span
ctx, span = tracer.Start(ctx, "start up")
defer span.End()
// new stuff above
logger.Info("Starting controller version "+version, "TraceID", trace.SpanContextFromContext(ctx).TraceID())
logger := zapr.NewLogger(zlog).WithName("github.com/tinkerbell/tink")
logger.Info("Starting controller version " + version)

ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
Expand All @@ -123,7 +100,7 @@ func NewRootCommand() *cobra.Command {
}

options := ctrl.Options{
Logger: logger2,
Logger: logger,
LeaderElection: config.EnableLeaderElection,
LeaderElectionID: "tink.tinkerbell.org",
LeaderElectionNamespace: namespace,
Expand All @@ -133,14 +110,14 @@ func NewRootCommand() *cobra.Command {
HealthProbeBindAddress: config.ProbeAddr,
}

ctrl.SetLogger(logger2)
ctrl.SetLogger(logger)

mgr, err := controller.NewManager(cfg, options, logger)
if err != nil {
return fmt.Errorf("controller manager: %w", err)
}

return mgr.Start(ctx)
return mgr.Start(cmd.Context())
},
}
config.AddFlags(cmd.Flags())
Expand Down
2 changes: 1 addition & 1 deletion config/manager-rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: tink-controller-manager-role
name: manager-role
rules:
- apiGroups:
- bmc.tinkerbell.org
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tinkerbell/rufio v0.3.3
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.55.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
go.opentelemetry.io/otel/log v0.6.0
go.opentelemetry.io/otel/sdk/log v0.6.0
go.opentelemetry.io/otel/sdk v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -109,12 +110,9 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIX
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg=
go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts=
go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 h1:lsInsfvhVIfOI6qHVyysXMNDnjO9Npvl7tlDPJFBVd4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0/go.mod h1:KQsVNh4OjgjTG0G6EiNi1jVpnaeeKsKMRwbLN+f1+8M=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA=
Expand All @@ -233,8 +231,6 @@ go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4Q
go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ=
go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE=
go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg=
go.opentelemetry.io/otel/sdk/log v0.6.0 h1:4J8BwXY4EeDE9Mowg+CyhWVBhTSLXVXodiXxS/+PGqI=
go.opentelemetry.io/otel/sdk/log v0.6.0/go.mod h1:L1DN8RMAduKkrwRAFDEX3E3TLOq46+XMGSbUfHU/+vE=
go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc=
go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
Expand Down
46 changes: 14 additions & 32 deletions internal/deprecated/workflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package workflow
import (
"context"
"fmt"
"log/slog"
"time"

rufio "github.com/tinkerbell/rufio/api/v1alpha1"
"github.com/tinkerbell/tink/api/v1alpha1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"github.com/tinkerbell/tink/internal/deprecated/workflow/journal"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -29,14 +27,10 @@ func (j jobName) String() string {

// this function will update the Workflow status.
func (s *state) handleJob(ctx context.Context, actions []rufio.Action, name jobName) (reconcile.Result, error) {
tracer := otel.Tracer("handleJob")
var span trace.Span
ctx, span = tracer.Start(ctx, "handleJob")
defer span.End()
// there are 3 phases. 1. Clean up existing 2. Create new 3. Track status
// 1. clean up existing job if it wasn't already deleted
if j := s.workflow.Status.BootOptions.Jobs[name.String()]; !j.ExistingJobDeleted {
s.logger.InfoContext(ctx, "deleting existing job", "name", name)
journal.Log(ctx, "deleting existing job", "name", name)
result, err := s.deleteExisting(ctx, name)
if err != nil {
return result, err
Expand All @@ -47,7 +41,7 @@ func (s *state) handleJob(ctx context.Context, actions []rufio.Action, name jobN

// 2. create a new job
if uid := s.workflow.Status.BootOptions.Jobs[name.String()].UID; uid == "" {
s.logger.InfoContext(ctx, "no uid found for job", "name", name)
journal.Log(ctx, "no uid found for job", "name", name)
result, err := s.createJob(ctx, actions, name)
if err != nil {
s.workflow.Status.SetCondition(v1alpha1.WorkflowCondition{
Expand All @@ -71,7 +65,7 @@ func (s *state) handleJob(ctx context.Context, actions []rufio.Action, name jobN

// 3. track status
if !s.workflow.Status.BootOptions.Jobs[name.String()].Complete {
s.logger.InfoContext(ctx, "tracking job", "name", name)
journal.Log(ctx, "tracking job", "name", name)
// track status
r, tState, err := s.trackRunningJob(ctx, name)
if err != nil {
Expand Down Expand Up @@ -121,23 +115,19 @@ func (s *state) deleteExisting(ctx context.Context, name jobName) (reconcile.Res

// This function will update the Workflow status.
func (s *state) createJob(ctx context.Context, actions []rufio.Action, name jobName) (reconcile.Result, error) {
tracer := otel.Tracer("createJob")
var span trace.Span
ctx, span = tracer.Start(ctx, "createJob")
defer span.End()
// create a new job
// The assumption is that the UID is not set. UID checking is not handled here.
// 1. look up if there's an existing job with the same name, if so update the status with the UID and return
// 2. if there's no existing job, create a new job, update the status with the UID, and return

rj := &rufio.Job{}
if err := s.client.Get(ctx, client.ObjectKey{Name: name.String(), Namespace: s.workflow.Namespace}, rj); err == nil {
s.logger.InfoContext(ctx, "job already exists", "name", name)
journal.Log(ctx, "job already exists", "name", name)
if !rj.DeletionTimestamp.IsZero() {
s.logger.InfoContext(ctx, "job is being deleted", "name", name)
journal.Log(ctx, "job is being deleted", "name", name)
return reconcile.Result{Requeue: true}, nil
}
//TODO(jacobweinstock): job exists means that the job name and uid from the status are the same.
// TODO(jacobweinstock): job exists means that the job name and uid from the status are the same.
// get the UID and update the status
jStatus := s.workflow.Status.BootOptions.Jobs[name.String()]
jStatus.UID = rj.GetUID()
Expand All @@ -154,10 +144,10 @@ func (s *state) createJob(ctx context.Context, actions []rufio.Action, name jobN
return reconcile.Result{}, fmt.Errorf("hardware %q does not have a BMC", s.hardware.Name)
}

if err := create(ctx, s.logger, s.client, name.String(), s.hardware, s.workflow.Namespace, actions); err != nil {
if err := create(ctx, s.client, name.String(), s.hardware, s.workflow.Namespace, actions); err != nil {
return reconcile.Result{}, fmt.Errorf("error creating job: %w", err)
}
s.logger.InfoContext(ctx, "job created", "name", name)
journal.Log(ctx, "job created", "name", name)

return reconcile.Result{Requeue: true}, nil
}
Expand All @@ -173,23 +163,19 @@ var (

// This function will update the Workflow status.
func (s *state) trackRunningJob(ctx context.Context, name jobName) (reconcile.Result, trackedState, error) {
tracer := otel.Tracer("trackRunningJob")
var span trace.Span
ctx, span = tracer.Start(ctx, "trackRunningJob")
defer span.End()
// track status
// get the job
rj := &rufio.Job{}
if err := s.client.Get(ctx, client.ObjectKey{Name: name.String(), Namespace: s.workflow.Namespace}, rj); err != nil {
return reconcile.Result{}, trackedStateError, fmt.Errorf("error getting job: %w", err)
}
if rj.HasCondition(rufio.JobFailed, rufio.ConditionTrue) {
s.logger.InfoContext(ctx, "job failed", "name", name)
journal.Log(ctx, "job failed", "name", name)
// job failed
return reconcile.Result{}, trackedStateFailed, fmt.Errorf("job failed")
}
if rj.HasCondition(rufio.JobCompleted, rufio.ConditionTrue) {
s.logger.InfoContext(ctx, "job completed", "name", name)
journal.Log(ctx, "job completed", "name", name)
// job completed
jStatus := s.workflow.Status.BootOptions.Jobs[name.String()]
jStatus.Complete = true
Expand All @@ -198,17 +184,13 @@ func (s *state) trackRunningJob(ctx context.Context, name jobName) (reconcile.Re
return reconcile.Result{}, trackedStateComplete, nil
}
// still running
s.logger.InfoContext(ctx, "job still running", "name", name)
journal.Log(ctx, "job still running", "name", name)
time.Sleep(s.backoff.NextBackOff())
return reconcile.Result{Requeue: true}, trackedStateRunning, nil
}

func create(ctx context.Context, logger *slog.Logger, cc client.Client, name string, hw *v1alpha1.Hardware, ns string, tasks []rufio.Action) error {
tracer := otel.Tracer("create")
var span trace.Span
ctx, span = tracer.Start(ctx, "create")
defer span.End()
logger.InfoContext(ctx, "creating job", "name", name)
func create(ctx context.Context, cc client.Client, name string, hw *v1alpha1.Hardware, ns string, tasks []rufio.Action) error {
journal.Log(ctx, "creating job", "name", name)
if err := cc.Create(ctx, &rufio.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down
69 changes: 69 additions & 0 deletions internal/deprecated/workflow/journal/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package journal

import (
"context"
"log/slog"
"path/filepath"
"runtime"
"strings"
"time"
)

type CtxKey string

const Name CtxKey = "journal"

type Entry struct {
Msg string `json:"msg"`
Args map[string]any `json:"args,omitempty"`
Source slog.Source `json:"source"`
Time string `json:"time"`
}

// New creates a slice of Entries in the provided context.
func New(ctx context.Context) context.Context {
e := &[]Entry{}
return context.WithValue(ctx, Name, e)
}

// Log adds a new Entry to the journal in the provided context.
// Log is not thread-safe.
func Log(ctx context.Context, msg string, args ...any) {
t := time.Now().UTC().Format(time.RFC3339Nano)
m := make(map[string]any)
for i := 0; i < len(args); i += 2 {
m[args[i].(string)] = args[i+1]

Check failure on line 35 in internal/deprecated/workflow/journal/journal.go

View workflow job for this annotation

GitHub Actions / Verify

type assertion must be checked (forcetypeassert)
}
e, ok := ctx.Value(Name).(*[]Entry)
if !ok {
e = &[]Entry{{Msg: msg, Args: m, Source: fileAndLine(), Time: t}}

Check failure on line 39 in internal/deprecated/workflow/journal/journal.go

View workflow job for this annotation

GitHub Actions / Verify

ineffectual assignment to e (ineffassign)
return
}
*e = append(*e, Entry{Msg: msg, Args: m, Source: fileAndLine(), Time: t})
}

// Journal returns the journal from the provided context.
func Journal(ctx context.Context) []Entry {
e, ok := ctx.Value(Name).(*[]Entry)
if !ok {
return nil
}
return *e
}

func fileAndLine() slog.Source {
pc, file, line, _ := runtime.Caller(2)
fn := runtime.FuncForPC(pc)
var fnName string
if fn == nil {
fnName = "?()"
} else {
fnName = strings.TrimLeft(filepath.Ext(fn.Name()), ".") + "()"
}

return slog.Source{
Function: fnName,
File: filepath.Base(file),
Line: line,
}
}
Loading

0 comments on commit 68ff0e4

Please sign in to comment.