Skip to content

Commit

Permalink
services/horizon: Add ingestion processors run duration metrics (stel…
Browse files Browse the repository at this point in the history
…lar#3224)

Add ingestion processors run duration metrics to better understand
performance bottlenecks. The new metrics are exposed in /metrics and
summarize time spent in each ingestion processor.
  • Loading branch information
bartekn authored Nov 16, 2020
1 parent c2c77fb commit b361462
Show file tree
Hide file tree
Showing 12 changed files with 292 additions and 113 deletions.
26 changes: 19 additions & 7 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ func (r resumeState) run(s *system) (transition, error) {
"commit": true,
}).Info("Processing ledger")

changeStats, ledgerTransactionStats, err := s.runner.RunAllProcessorsOnLedger(ingestLedger)
changeStats, changeDurations, transactionStats, transactionDurations, err :=
s.runner.RunAllProcessorsOnLedger(ingestLedger)
if err != nil {
return retryResume(r), errors.Wrap(err, "Error running processors on ledger")
}
Expand All @@ -477,13 +478,15 @@ func (r resumeState) run(s *system) (transition, error) {
// Update stats metrics
changeStatsMap := changeStats.Map()
r.addLedgerStatsMetricFromMap(s, "change", changeStatsMap)
r.addProcessorDurationsMetricFromMap(s, changeDurations)

ledgerTransactionStatsMap := ledgerTransactionStats.Map()
r.addLedgerStatsMetricFromMap(s, "ledger", ledgerTransactionStatsMap)
transactionStatsMap := transactionStats.Map()
r.addLedgerStatsMetricFromMap(s, "ledger", transactionStatsMap)
r.addProcessorDurationsMetricFromMap(s, transactionDurations)

log.
WithFields(changeStatsMap).
WithFields(ledgerTransactionStatsMap).
WithFields(transactionStatsMap).
WithFields(logpkg.F{
"sequence": ingestLedger,
"duration": duration,
Expand All @@ -506,6 +509,15 @@ func (r resumeState) addLedgerStatsMetricFromMap(s *system, prefix string, m map
}
}

func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for processorName, value := range m {
// * is not accepted in Prometheus labels
processorName = strings.Replace(processorName, "*", "", -1)
s.Metrics().ProcessorsRunDuration.
With(prometheus.Labels{"name": processorName}).Add(value.Seconds())
}
}

type historyRangeState struct {
fromLedger uint32
toLedger uint32
Expand Down Expand Up @@ -576,7 +588,7 @@ func runTransactionProcessorsOnLedger(s *system, ledger uint32) error {
}).Info("Processing ledger")
startTime := time.Now()

ledgerTransactionStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
ledgerTransactionStats, _, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger))
}
Expand Down Expand Up @@ -831,7 +843,7 @@ func (v verifyRangeState) run(s *system) (transition, error) {

var changeStats io.StatsChangeProcessorResults
var ledgerTransactionStats io.StatsLedgerTransactionProcessorResults
changeStats, ledgerTransactionStats, err = s.runner.RunAllProcessorsOnLedger(sequence)
changeStats, _, ledgerTransactionStats, _, err = s.runner.RunAllProcessorsOnLedger(sequence)
if err != nil {
err = errors.Wrap(err, "Error running processors on ledger")
return stop(), err
Expand Down Expand Up @@ -898,7 +910,7 @@ func (stressTestState) run(s *system) (transition, error) {
}).Info("Processing ledger")
startTime := time.Now()

changeStats, ledgerTransactionStats, err := s.runner.RunAllProcessorsOnLedger(sequence)
changeStats, _, ledgerTransactionStats, _, err := s.runner.RunAllProcessorsOnLedger(sequence)
if err != nil {
err = errors.Wrap(err, "Error running processors on ledger")
return stop(), err
Expand Down
51 changes: 41 additions & 10 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,83 @@
package ingest

import (
"fmt"
"time"

"github.com/stellar/go/ingest/io"
"github.com/stellar/go/support/errors"
)

type horizonChangeProcessor interface {
io.ChangeProcessor
// TODO maybe rename to Flush()
Commit() error
type processorsRunDurations map[string]time.Duration

func (d processorsRunDurations) AddRunDuration(name string, startTime time.Time) {
d[name] += time.Since(startTime)
}

type groupChangeProcessors []horizonChangeProcessor
type groupChangeProcessors struct {
processors []horizonChangeProcessor
processorsRunDurations
}

func newGroupChangeProcessors(processors []horizonChangeProcessor) *groupChangeProcessors {
return &groupChangeProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
}
}

func (g groupChangeProcessors) ProcessChange(change io.Change) error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessChange(change); err != nil {
return errors.Wrapf(err, "error in %T.ProcessChange", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

func (g groupChangeProcessors) Commit() error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.Commit(); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

type groupTransactionProcessors []horizonTransactionProcessor
type groupTransactionProcessors struct {
processors []horizonTransactionProcessor
processorsRunDurations
}

func newGroupTransactionProcessors(processors []horizonTransactionProcessor) *groupTransactionProcessors {
return &groupTransactionProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
}
}

func (g groupTransactionProcessors) ProcessTransaction(tx io.LedgerTransaction) error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessTransaction(tx); err != nil {
return errors.Wrapf(err, "error in %T.ProcessTransaction", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

func (g groupTransactionProcessors) Commit() error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.Commit(); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}
8 changes: 4 additions & 4 deletions services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func TestGroupChangeProcessorsTestSuiteLedger(t *testing.T) {
func (s *GroupChangeProcessorsTestSuiteLedger) SetupTest() {
s.processorA = &mockHorizonChangeProcessor{}
s.processorB = &mockHorizonChangeProcessor{}
s.processors = &groupChangeProcessors{
s.processors = newGroupChangeProcessors([]horizonChangeProcessor{
s.processorA,
s.processorB,
}
})
}

func (s *GroupChangeProcessorsTestSuiteLedger) TearDownTest() {
Expand Down Expand Up @@ -127,10 +127,10 @@ func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) {
func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() {
s.processorA = &mockHorizonTransactionProcessor{}
s.processorB = &mockHorizonTransactionProcessor{}
s.processors = &groupTransactionProcessors{
s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{
s.processorA,
s.processorB,
}
})
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerR
s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once()

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, errors.New("my error")).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
errors.New("my error"),
).Once()

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
s.Assert().Error(err)
Expand Down Expand Up @@ -175,7 +179,12 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() {

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
for i := 100; i <= 200; i++ {
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).
Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
}

s.historyQ.On("Commit").Return(nil).Once()
Expand All @@ -191,7 +200,11 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccessOneLedger() {
s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once()

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(nil).Once()

Expand Down Expand Up @@ -333,7 +346,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedge
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).
Return(io.StatsLedgerTransactionProcessorResults{}, errors.New("my error")).Once()
Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
errors.New("my error"),
).Once()
s.historyQ.On("Rollback").Return(nil).Once()

err := s.system.ReingestRange(100, 200, false)
Expand All @@ -353,7 +370,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(errors.New("my error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()
Expand All @@ -377,7 +398,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", i).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", i).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
Expand All @@ -397,7 +422,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
s.historyQ.On("Commit").Return(nil).Once()

// Recreate mock in this single test to remove previous assertion.
Expand Down Expand Up @@ -427,7 +456,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() {
).Return(nil).Once()

for i := 100; i <= 200; i++ {
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
}

s.historyQ.On("Commit").Return(nil).Once()
Expand Down
11 changes: 11 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Metrics struct {

// LedgerStatsCounter exposes ledger stats counters (like number of ops/changes).
LedgerStatsCounter *prometheus.CounterVec

// ProcessorsRunDuration exposes processors run durations.
ProcessorsRunDuration *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -267,6 +270,14 @@ func (s *system) initMetrics() {
},
[]string{"type"},
)

s.metrics.ProcessorsRunDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "processor_run_duration_seconds_total",
Help: "run durations of ingestion processors",
},
[]string{"name"},
)
}

func (s *system) Metrics() Metrics {
Expand Down
24 changes: 19 additions & 5 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,30 @@ func (m *mockProcessorsRunner) RunHistoryArchiveIngestion(checkpointLedger uint3
return args.Get(0).(io.StatsChangeProcessorResults), args.Error(1)
}

func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, error) {
func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(sequence uint32) (
io.StatsChangeProcessorResults,
processorsRunDurations,
io.StatsLedgerTransactionProcessorResults,
processorsRunDurations,
error,
) {
args := m.Called(sequence)
return args.Get(0).(io.StatsChangeProcessorResults),
args.Get(1).(io.StatsLedgerTransactionProcessorResults),
args.Error(2)
args.Get(1).(processorsRunDurations),
args.Get(2).(io.StatsLedgerTransactionProcessorResults),
args.Get(3).(processorsRunDurations),
args.Error(4)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(sequence uint32) (io.StatsLedgerTransactionProcessorResults, error) {
func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(sequence uint32) (
io.StatsLedgerTransactionProcessorResults,
processorsRunDurations,
error,
) {
args := m.Called(sequence)
return args.Get(0).(io.StatsLedgerTransactionProcessorResults), args.Error(1)
return args.Get(0).(io.StatsLedgerTransactionProcessorResults),
args.Get(1).(processorsRunDurations),
args.Error(2)
}

var _ ProcessorRunnerInterface = (*mockProcessorsRunner)(nil)
Expand Down
Loading

0 comments on commit b361462

Please sign in to comment.