Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor badger storage dependency parsing for better performance #1694

Closed
wants to merge 11 commits into from
65 changes: 11 additions & 54 deletions plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -36,64 +36,21 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore {

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
StartTimeMin: endTs.Add(-1 * lookback),
StartTimeMax: endTs,
}

// We need to do a full table scan - if this becomes a bottleneck, we can write write an index that describes
// dependencyKeyPrefix + timestamp + parent + child key and do a key-only seek (which is fast - but requires additional writes)

// GetDependencies is not shipped with a context like the SpanReader / SpanWriter
traces, err := s.reader.FindTraces(context.Background(), params)
br := s.reader.(*badgerStore.TraceReader)
resultMap, err := br.ScanDependencyIndex(endTs.Add(-1*lookback), endTs)
if err != nil {
return nil, err
}
for _, tr := range traces {
processTrace(deps, tr)
}

return depMapToSlice(deps), err
}
retMe := make([]model.DependencyLink, 0, len(resultMap))

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []model.DependencyLink {
retMe := make([]model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, *dep)
for k, v := range resultMap {
retMe = append(retMe, model.DependencyLink{
Parent: k.From,
Child: k.To,
CallCount: v,
})
}
return retMe
}

// processTrace is copy from the memory storage plugin
func processTrace(deps map[string]*model.DependencyLink, trace *model.Trace) {
for _, s := range trace.Spans {
parentSpan := seekToSpan(trace, s.ParentSpanID())
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
continue
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}
}

func seekToSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
return s
}
}
return nil
return retMe, nil
}
13 changes: 0 additions & 13 deletions plugin/storage/badger/dependencystore/storage_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,3 @@
// limitations under the License.

package dependencystore

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestSeekToSpan(t *testing.T) {
span := seekToSpan(&model.Trace{}, model.SpanID(uint64(1)))
assert.Nil(t, span)
}
53 changes: 52 additions & 1 deletion plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package dependencystore_test
import (
"fmt"
"io"
"log"
"os"
"runtime/pprof"
"testing"
"time"

Expand Down Expand Up @@ -94,10 +97,58 @@ func TestDependencyReader(t *testing.T) {
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
links, err = dr.GetDependencies(tid.Add(time.Hour), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
assert.Equal(t, uint64(traces), links[0].CallCount) // Each trace calls the same services
})
}

func BenchmarkDependencyReader(b *testing.B) {
runFactoryTest(b, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
dr.GetDependencies(tid, time.Hour)

traces := 100000
spans := 16
for i := 0; i < traces; i++ {
for j := 0; j < spans; j++ {
s := model.Span{
TraceID: model.TraceID{
Low: uint64(i),
High: 1,
},
SpanID: model.SpanID(j),
OperationName: fmt.Sprintf("operation-a"),
Process: &model.Process{
ServiceName: fmt.Sprintf("service-%d", j),
},
StartTime: tid.Add(time.Duration(i)),
Duration: time.Duration(i + j),
}
if j > 0 {
s.References = []model.SpanRef{model.NewChildOfRef(s.TraceID, model.SpanID(j-1))}
}
_ = sw.WriteSpan(&s)
}
}

// The above insert triggers backend compaction.. let it finish before benchmarking
time.Sleep(5 * time.Second)

f, err := os.Create("profile.out")
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()

b.ResetTimer()
for a := 0; a < b.N; a++ {
dr.GetDependencies(time.Now(), time.Hour)
}
})
}
9 changes: 7 additions & 2 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.store = store

err = badgerStore.SchemaUpdate(store, logger)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Error("Failed to update the schema, badger storage failed to start: " + err.Error())
store.Close()
return err
}

f.cache = badgerStore.NewCacheStore(f.store, f.Options.primary.SpanStoreTTL, true)

f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
Expand Down Expand Up @@ -191,8 +198,6 @@ func (f *Factory) maintenance() {
}
if err == badger.ErrNoRewrite {
f.metrics.LastValueLogCleaned.Update(t.UnixNano())
} else {
f.logger.Error("Failed to run ValueLogGC", zap.Error(err))
}

f.metrics.LastMaintenanceRun.Update(t.UnixNano())
Expand Down
144 changes: 129 additions & 15 deletions plugin/storage/badger/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
package badger

import (
"bytes"
"encoding/binary"
"expvar"
"fmt"
"io"
"io/ioutil"
"os"
"testing"
"time"

"github.com/dgraph-io/badger"
assert "github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config"
)

Expand Down Expand Up @@ -121,29 +126,32 @@ func TestMaintenanceRun(t *testing.T) {
assert.NoError(t, err)
}

// TestMaintenanceCodecov this test is not intended to test anything, but hopefully increase coverage by triggering a log line
func TestMaintenanceCodecov(t *testing.T) {
// For Codecov - this does not test anything
// TestSchemaVersionWritten ensures that the Factory does call the SchemaManager correctly
func TestSchemaVersionWritten(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
// Lets speed up the maintenance ticker..
command.ParseFlags([]string{
"--badger.maintenance-interval=10ms",
})
v, _ := config.Viperize(f.AddFlags)
f.InitFromViper(v)
mFactory := metricstest.NewFactory(0)
f.Initialize(mFactory, zap.NewNop())

waiter := func() {
for sleeps := 0; sleeps < 8; sleeps++ {
// Wait for the scheduler
time.Sleep(time.Duration(50) * time.Millisecond)
schemaVersion := -1
err := f.store.View(func(txn *badger.Txn) error {
schemaKey := []byte{0x11}
item, err := txn.Get(schemaKey)
if err != nil {
return err
}
}

err := f.store.Close()
val, err := item.Value()
if err != nil {
return err
}
schemaVersion = int(binary.BigEndian.Uint32(val))
return nil
})

assert.NoError(t, err)
waiter() // This should trigger the logging of error
assert.True(t, schemaVersion > 0)
}

func TestBadgerMetrics(t *testing.T) {
Expand Down Expand Up @@ -187,3 +195,109 @@ func TestBadgerMetrics(t *testing.T) {
err := f.Close()
assert.NoError(t, err)
}

func TestFailToInitializeByCorruptingDB(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
dir, _ := ioutil.TempDir("", "badger")
defer os.RemoveAll(dir)

keyParam := fmt.Sprintf("--badger.directory-key=%s", dir)
valueParam := fmt.Sprintf("--badger.directory-value=%s", dir)

command.ParseFlags([]string{
"--badger.ephemeral=false",
"--badger.consistency=true",
keyParam,
valueParam,
})
f.InitFromViper(v)

err := f.Initialize(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

sw, err := f.CreateSpanWriter()
assert.NoError(t, err)

err = sw.WriteSpan(createDummySpan())
assert.NoError(t, err)

err = f.Close()
assert.NoError(t, err)

opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
store, err := badger.Open(opts)
assert.NoError(t, err)

// Corrupt the data
err = store.Update(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

spanKey := []byte{0x80}
it.Seek(spanKey)
if it.Item() != nil && bytes.HasPrefix(it.Item().Key(), spanKey) {
val := []byte{}
val, err = it.Item().ValueCopy(val)
assert.NoError(t, err)

val[0] = 0 // Corrupt the proto
keyCopy := []byte{}
keyCopy = it.Item().KeyCopy(keyCopy)
err = txn.Set(keyCopy, val)
assert.NoError(t, err)
}

// Delete the schema key to cause proto unmarshalling
err = txn.Delete([]byte{0x11})
assert.NoError(t, err)

return nil
})
assert.NoError(t, err)

err = store.Close()
assert.NoError(t, err)

err = f.Initialize(metrics.NullFactory, zap.NewNop())
assert.Error(t, err)
}

func createDummySpan() *model.Span {
tid := time.Now()

dummyKv := []model.KeyValue{
{
Key: "key",
VType: model.StringType,
VStr: "value",
},
}

testSpan := model.Span{
TraceID: model.TraceID{
Low: uint64(0),
High: 1,
},
SpanID: model.SpanID(0),
OperationName: "operation",
Process: &model.Process{
ServiceName: "service",
Tags: dummyKv,
},
StartTime: tid.Add(time.Duration(1 * time.Millisecond)),
Duration: time.Duration(1 * time.Millisecond),
Tags: dummyKv,
Logs: []model.Log{
{
Timestamp: tid,
Fields: dummyKv,
},
},
}

return &testSpan
}
Loading