Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Badger dependency modifications, alternative version #1720

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 13 additions & 52 deletions plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -36,64 +36,25 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore {

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
StartTimeMin: endTs.Add(-1 * lookback),
br := s.reader.(*badgerStore.TraceReader)
query := &spanstore.TraceQueryParameters{
StartTimeMax: endTs,
StartTimeMin: endTs.Add(-1 * lookback),
}

// We need to do a full table scan - if this becomes a bottleneck, we can write write an index that describes
// dependencyKeyPrefix + timestamp + parent + child key and do a key-only seek (which is fast - but requires additional writes)

// GetDependencies is not shipped with a context like the SpanReader / SpanWriter
traces, err := s.reader.FindTraces(context.Background(), params)
resultMap, err := br.ScanDependencyIndex(query)
if err != nil {
return nil, err
}
for _, tr := range traces {
processTrace(deps, tr)
}

return depMapToSlice(deps), err
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []model.DependencyLink {
retMe := make([]model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, *dep)
}
return retMe
}
retMe := make([]model.DependencyLink, 0, len(resultMap))

// processTrace is copy from the memory storage plugin
func processTrace(deps map[string]*model.DependencyLink, trace *model.Trace) {
for _, s := range trace.Spans {
parentSpan := seekToSpan(trace, s.ParentSpanID())
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
continue
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
for k, v := range resultMap {
retMe = append(retMe, model.DependencyLink{
Parent: k.From,
Child: k.To,
CallCount: v,
})
}
}

func seekToSpan(trace *model.Trace, spanID model.SpanID) *model.Span {
for _, s := range trace.Spans {
if s.SpanID == spanID {
return s
}
}
return nil
return retMe, nil
}
13 changes: 0 additions & 13 deletions plugin/storage/badger/dependencystore/storage_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,3 @@
// limitations under the License.

package dependencystore

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestSeekToSpan(t *testing.T) {
span := seekToSpan(&model.Trace{}, model.SpanID(uint64(1)))
assert.Nil(t, span)
}
53 changes: 52 additions & 1 deletion plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package dependencystore_test
import (
"fmt"
"io"
"log"
"os"
"runtime/pprof"
"testing"
"time"

Expand Down Expand Up @@ -94,10 +97,58 @@ func TestDependencyReader(t *testing.T) {
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
links, err = dr.GetDependencies(tid.Add(time.Hour), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
assert.Equal(t, uint64(traces), links[0].CallCount) // Each trace calls the same services
})
}

func BenchmarkDependencyReader(b *testing.B) {
runFactoryTest(b, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
dr.GetDependencies(tid, time.Hour)

traces := 100000
spans := 16
for i := 0; i < traces; i++ {
for j := 0; j < spans; j++ {
s := model.Span{
TraceID: model.TraceID{
Low: uint64(i),
High: 1,
},
SpanID: model.SpanID(j),
OperationName: fmt.Sprintf("operation-a"),
Process: &model.Process{
ServiceName: fmt.Sprintf("service-%d", j),
},
StartTime: tid.Add(time.Duration(i)),
Duration: time.Duration(i + j),
}
if j > 0 {
s.References = []model.SpanRef{model.NewChildOfRef(s.TraceID, model.SpanID(j-1))}
}
_ = sw.WriteSpan(&s)
}
}

// The above insert triggers backend compaction.. let it finish before benchmarking
time.Sleep(5 * time.Second)

f, err := os.Create("profile.out")
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()

b.ResetTimer()
for a := 0; a < b.N; a++ {
dr.GetDependencies(time.Now(), time.Hour)
}
})
}
7 changes: 7 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.store = store

err = badgerStore.SchemaUpdate(store, logger)
if err != nil {
logger.Error("Failed to update the schema, badger storage failed to start: " + err.Error())
store.Close()
return err
}

f.cache = badgerStore.NewCacheStore(f.store, f.Options.primary.SpanStoreTTL, true)

f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
Expand Down
136 changes: 136 additions & 0 deletions plugin/storage/badger/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package badger

import (
"encoding/binary"
"expvar"
"fmt"
"io"
"os"
"testing"
"time"

"github.com/dgraph-io/badger"
assert "github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
Expand Down Expand Up @@ -146,6 +148,34 @@ func TestMaintenanceCodecov(t *testing.T) {
waiter() // This should trigger the logging of error
}

// TestSchemaVersionWritten ensures that the Factory does call the SchemaManager correctly
func TestSchemaVersionWritten(t *testing.T) {
f := NewFactory()
v, _ := config.Viperize(f.AddFlags)
f.InitFromViper(v)
mFactory := metricstest.NewFactory(0)
f.Initialize(mFactory, zap.NewNop())

schemaVersion := -1
err := f.store.View(func(txn *badger.Txn) error {
schemaKey := []byte{0x11}
item, err := txn.Get(schemaKey)
if err != nil {
return err
}

val, err := item.Value()
if err != nil {
return err
}
schemaVersion = int(binary.BigEndian.Uint32(val))
return nil
})

assert.NoError(t, err)
assert.True(t, schemaVersion > 0)
}

func TestBadgerMetrics(t *testing.T) {
// The expvar is leaking keyparams between tests. We need to clean up a bit..
eMap := expvar.Get("badger_lsm_size_bytes").(*expvar.Map)
Expand Down Expand Up @@ -187,3 +217,109 @@ func TestBadgerMetrics(t *testing.T) {
err := f.Close()
assert.NoError(t, err)
}

func TestFailToInitializeByCorruptingDB(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
dir, _ := ioutil.TempDir("", "badger")
// defer os.RemoveAll(dir)

keyParam := fmt.Sprintf("--badger.directory-key=%s", dir)
valueParam := fmt.Sprintf("--badger.directory-value=%s", dir)

command.ParseFlags([]string{
"--badger.ephemeral=false",
"--badger.consistency=true",
keyParam,
valueParam,
})
f.InitFromViper(v)

err := f.Initialize(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

sw, err := f.CreateSpanWriter()
assert.NoError(t, err)

err = sw.WriteSpan(createDummySpan())
assert.NoError(t, err)

err = f.Close()
assert.NoError(t, err)

opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
store, err := badger.Open(opts)
assert.NoError(t, err)

// Corrupt the data
err = store.Update(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

spanKey := []byte{0x80}
it.Seek(spanKey)
if it.Item() != nil && bytes.HasPrefix(it.Item().Key(), spanKey) {
val := []byte{}
val, err = it.Item().ValueCopy(val)
assert.NoError(t, err)

val[0] = 0 // Corrupt the proto
keyCopy := []byte{}
keyCopy = it.Item().KeyCopy(keyCopy)
err = txn.Set(keyCopy, val)
assert.NoError(t, err)
}

// Delete the schema key to cause proto unmarshalling
err = txn.Delete([]byte{0x11})
assert.NoError(t, err)

return nil
})
assert.NoError(t, err)

err = store.Close()
assert.NoError(t, err)

err = f.Initialize(metrics.NullFactory, zap.NewNop())
assert.Error(t, err)
}

func createDummySpan() *model.Span {
tid := time.Now()

dummyKv := []model.KeyValue{
{
Key: "key",
VType: model.StringType,
VStr: "value",
},
}

testSpan := model.Span{
TraceID: model.TraceID{
Low: uint64(0),
High: 1,
},
SpanID: model.SpanID(0),
OperationName: "operation",
Process: &model.Process{
ServiceName: "service",
Tags: dummyKv,
},
StartTime: tid.Add(time.Duration(1 * time.Millisecond)),
Duration: time.Duration(1 * time.Millisecond),
Tags: dummyKv,
Logs: []model.Log{
{
Timestamp: tid,
Fields: dummyKv,
},
},
}

return &testSpan
}
Loading