Skip to content

Commit

Permalink
Add Pipeline.VerboseLogging config option (#12498)
Browse files Browse the repository at this point in the history
* Add Pipeline.VerboseLogging config option

* Add changelog

* lint

* Bump log level in error cases

* Make generate

* rename key to specID

* Adjust levels

* make config-docs
  • Loading branch information
samsondav authored Mar 20, 2024
1 parent ccd177d commit 1c576d0
Show file tree
Hide file tree
Showing 27 changed files with 121 additions and 7 deletions.
18 changes: 18 additions & 0 deletions .changeset/popular-buckets-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"chainlink": patch
---

Add new config option Pipeline.VerboseLogging

VerboseLogging enables detailed logging of pipeline execution steps. This is
disabled by default because it increases log volume for pipeline runs, but can
be useful for debugging failed runs without relying on the UI or database.
Consider enabling this if you disabled run saving by setting MaxSuccessfulRuns
to zero.

Set it like the following example:

```
[Pipeline]
VerboseLogging = true
```
6 changes: 6 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ ReaperThreshold = '24h' # Default
# **ADVANCED**
# ResultWriteQueueDepth controls how many writes will be buffered before subsequent writes are dropped, for jobs that write results asynchronously for performance reasons, such as OCR.
ResultWriteQueueDepth = 100 # Default
# VerboseLogging enables detailed logging of pipeline execution steps.
# This is disabled by default because it increases log volume for pipeline
# runs, but can be useful for debugging failed runs without relying on the UI
# or database. Consider enabling this if you disabled run saving by setting
# MaxSuccessfulRuns to zero.
VerboseLogging = false # Default

[JobPipeline.HTTPRequest]
# DefaultTimeout defines the default timeout for HTTP requests made by `http` and `bridge` adapters.
Expand Down
1 change: 1 addition & 0 deletions core/config/job_pipeline_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type JobPipeline interface {
ReaperThreshold() time.Duration
ResultWriteQueueDepth() uint64
ExternalInitiatorsEnabled() bool
VerboseLogging() bool
}
5 changes: 5 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ocrcommontypes "github.com/smartcontractkit/libocr/commontypes"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink/v2/core/build"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
Expand Down Expand Up @@ -853,6 +854,7 @@ type JobPipeline struct {
ReaperInterval *commonconfig.Duration
ReaperThreshold *commonconfig.Duration
ResultWriteQueueDepth *uint32
VerboseLogging *bool

HTTPRequest JobPipelineHTTPRequest `toml:",omitempty"`
}
Expand All @@ -876,6 +878,9 @@ func (j *JobPipeline) setFrom(f *JobPipeline) {
if v := f.ResultWriteQueueDepth; v != nil {
j.ResultWriteQueueDepth = v
}
if v := f.VerboseLogging; v != nil {
j.VerboseLogging = v
}
j.HTTPRequest.setFrom(&f.HTTPRequest)

}
Expand Down
1 change: 1 addition & 0 deletions core/internal/testutils/configtest/general_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func overrides(c *chainlink.Config, s *chainlink.Secrets) {
c.Database.DefaultLockTimeout = commonconfig.MustNewDuration(1 * time.Minute)

c.JobPipeline.ReaperInterval = commonconfig.MustNewDuration(0)
c.JobPipeline.VerboseLogging = ptr(true)

c.P2P.V2.Enabled = ptr(false)

Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_job_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ func (j *jobPipelineConfig) ResultWriteQueueDepth() uint64 {
func (j *jobPipelineConfig) ExternalInitiatorsEnabled() bool {
return *j.c.ExternalInitiatorsEnabled
}

func (j *jobPipelineConfig) VerboseLogging() bool {
return *j.c.VerboseLogging
}
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func TestConfig_Marshal(t *testing.T) {
ReaperInterval: commoncfg.MustNewDuration(4 * time.Hour),
ReaperThreshold: commoncfg.MustNewDuration(7 * 24 * time.Hour),
ResultWriteQueueDepth: ptr[uint32](10),
VerboseLogging: ptr(true),
HTTPRequest: toml.JobPipelineHTTPRequest{
MaxSize: ptr[utils.FileSize](100 * utils.MB),
DefaultTimeout: commoncfg.MustNewDuration(time.Minute),
Expand Down Expand Up @@ -845,6 +846,7 @@ MaxSuccessfulRuns = 123456
ReaperInterval = '4h0m0s'
ReaperThreshold = '168h0m0s'
ResultWriteQueueDepth = 10
VerboseLogging = true
[JobPipeline.HTTPRequest]
DefaultTimeout = '1m0s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ MaxSuccessfulRuns = 123456
ReaperInterval = '4h0m0s'
ReaperThreshold = '168h0m0s'
ResultWriteQueueDepth = 10
VerboseLogging = true

[JobPipeline.HTTPRequest]
DefaultTimeout = '1m0s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '30s'
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/mercury/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func setupNode(
// [JobPipeline]
// MaxSuccessfulRuns = 0
c.JobPipeline.MaxSuccessfulRuns = ptr(uint64(0))
c.JobPipeline.VerboseLogging = ptr(true)

// [Feature]
// UICSAKeys=true
Expand Down
5 changes: 3 additions & 2 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type (
MaxRunDuration() time.Duration
ReaperInterval() time.Duration
ReaperThreshold() time.Duration
VerboseLogging() bool
}

BridgeConfig interface {
Expand Down Expand Up @@ -200,8 +201,8 @@ func (result FinalResult) SingularResult() (Result, error) {
// TaskSpecID will always be non-zero
type TaskRunResult struct {
ID uuid.UUID
Task Task
TaskRun TaskRun
Task Task `json:"-"`
TaskRun TaskRun `json:"-"`
Result Result
Attempts uint
CreatedAt time.Time
Expand Down
18 changes: 18 additions & 0 deletions core/services/pipeline/mocks/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 36 additions & 4 deletions core/services/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) {
}

func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars, l logger.Logger) TaskRunResults {
l = l.With("jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName)
l = l.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName)
l.Debug("Initiating tasks for pipeline run of spec")

scheduler := newScheduler(pipeline, run, vars, l)
Expand Down Expand Up @@ -362,12 +362,12 @@ func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Var
run.FailSilently = scheduler.exiting
run.State = RunStatusSuspended

var runTime time.Duration
if !scheduler.pending {
run.FinishedAt = null.TimeFrom(time.Now())

// NOTE: runTime can be very long now because it'll include suspend
runTime := run.FinishedAt.Time.Sub(run.CreatedAt)
l.Debugw("Finished all tasks for pipeline run", "specID", run.PipelineSpecID, "runTime", runTime)
runTime = run.FinishedAt.Time.Sub(run.CreatedAt)
PromPipelineRunTotalTimeToCompletion.WithLabelValues(fmt.Sprintf("%d", run.PipelineSpec.JobID), run.PipelineSpec.JobName).Set(float64(runTime))
}

Expand All @@ -389,6 +389,9 @@ func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Var
})

sort.Slice(run.PipelineTaskRuns, func(i, j int) bool {
if run.PipelineTaskRuns[i].task.OutputIndex() == run.PipelineTaskRuns[j].task.OutputIndex() {
return run.PipelineTaskRuns[i].FinishedAt.ValueOrZero().Before(run.PipelineTaskRuns[j].FinishedAt.ValueOrZero())
}
return run.PipelineTaskRuns[i].task.OutputIndex() < run.PipelineTaskRuns[j].task.OutputIndex()
})
}
Expand Down Expand Up @@ -439,6 +442,33 @@ func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Var
idxs[i] = taskRunResults[i].Task.OutputIndex()
}

if r.config.VerboseLogging() {
l = l.With(
"run.PipelineTaskRuns", run.PipelineTaskRuns,
"run.Outputs", run.Outputs,
"run.CreatedAt", run.CreatedAt,
"run.FinishedAt", run.FinishedAt,
"run.Meta", run.Meta,
"run.Inputs", run.Inputs,
)
}
if run.HasFatalErrors() {
l = l.With("run.FatalErrors", run.FatalErrors)
}
if run.HasErrors() {
l = l.With("run.AllErrors", run.AllErrors)
}
l = l.With("run.State", run.State, "fatal", run.HasFatalErrors(), "runTime", runTime)
if run.HasFatalErrors() {
// This will also log at error level in OCR if it fails Observe so the
// level is appropriate
l.Errorw("Completed pipeline run with fatal errors")
} else if run.HasErrors() {
l.Debugw("Completed pipeline run with errors")
} else {
l.Debugw("Completed pipeline run successfully")
}

return taskRunResults
}

Expand Down Expand Up @@ -480,7 +510,9 @@ func (r *runner) executeTaskRun(ctx context.Context, spec Spec, taskRun *memoryT
loggerFields = append(loggerFields, "resultString", fmt.Sprintf("%q", v))
loggerFields = append(loggerFields, "resultHex", fmt.Sprintf("%x", v))
}
l.Tracew("Pipeline task completed", loggerFields...)
if r.config.VerboseLogging() {
l.Tracew("Pipeline task completed", loggerFields...)
}

now := time.Now()

Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ MaxSuccessfulRuns = 123456
ReaperInterval = '4h0m0s'
ReaperThreshold = '168h0m0s'
ResultWriteQueueDepth = 10
VerboseLogging = true

[JobPipeline.HTTPRequest]
DefaultTimeout = '1m0s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '30s'
Expand Down
11 changes: 11 additions & 0 deletions docs/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ MaxSuccessfulRuns = 10000 # Default
ReaperInterval = '1h' # Default
ReaperThreshold = '24h' # Default
ResultWriteQueueDepth = 100 # Default
VerboseLogging = false # Default
```


Expand Down Expand Up @@ -823,6 +824,16 @@ ResultWriteQueueDepth = 100 # Default
```
ResultWriteQueueDepth controls how many writes will be buffered before subsequent writes are dropped, for jobs that write results asynchronously for performance reasons, such as OCR.

### VerboseLogging
```toml
VerboseLogging = false # Default
```
VerboseLogging enables detailed logging of pipeline execution steps.
This is disabled by default because it increases log volume for pipeline
runs, but can be useful for debugging failed runs without relying on the UI
or database. Consider enabling this if you disabled run saving by setting
MaxSuccessfulRuns to zero.

## JobPipeline.HTTPRequest
```toml
[JobPipeline.HTTPRequest]
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestScripts(t *testing.T) {
Dir: path,
Setup: commonEnv,
ContinueOnError: true,
//UpdateScripts: true, // uncomment to update golden files
// UpdateScripts: true, // uncomment to update golden files
})
})
return nil
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/default.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/disk-based-logging.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/invalid-ocr-p2p.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/invalid.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/valid.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down
1 change: 1 addition & 0 deletions testdata/scripts/node/validate/warnings.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ MaxSuccessfulRuns = 10000
ReaperInterval = '1h0m0s'
ReaperThreshold = '24h0m0s'
ResultWriteQueueDepth = 100
VerboseLogging = false

[JobPipeline.HTTPRequest]
DefaultTimeout = '15s'
Expand Down

0 comments on commit 1c576d0

Please sign in to comment.