Skip to content

Commit

Permalink
Remove SASI indices (#1328)
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored Feb 14, 2019
1 parent 7e1ee51 commit 7e51957
Show file tree
Hide file tree
Showing 17 changed files with 723 additions and 111 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ Changes by Version

#### Backend Changes

- Remove cassandra SASI indices [#1328](https://github.com/jaegertracing/jaeger/pull/1328)

Migration Path:

1. Run `plugin/storage/cassandra/schema/migration/v001tov002part1.sh` which will copy dependencies into a csv, update the `dependency UDT`, create a new `dependencies_v2` table, and write dependencies from the csv into the `dependencies_v2` table.
2. Run the collector and query services with the cassandra flag `cassandra.enable-dependencies-v2=true` which will instruct jaeger to write and read to and from the new `dependencies_v2` table.
3. Update [spark job](https://github.com/jaegertracing/spark-dependencies) to write to the new `dependencies_v2` table.
4. Run `plugin/storage/cassandra/schema/migration/v001tov002part2.sh` which will DELETE the old dependency table and the SASI index.

Users who wish to continue to use the v1 table don't have to do anything as the cassandra flag `cassandra.enable-dependencies-v2` will default to false. Users may migrate on their own timeline however new features will be built solely on the `dependencies_v2` table. In the future, we will remove support for v1 completely.

##### Breaking Changes

##### New Features
Expand Down
23 changes: 20 additions & 3 deletions model/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,26 @@

package model

// DependencyLinkSource is the source of data used to generate the dependencies.
type DependencyLinkSource string

const (
// JaegerDependencyLinkSource describes a dependency diagram that was generated from Jaeger traces.
JaegerDependencyLinkSource = DependencyLinkSource("jaeger")
)

// DependencyLink shows dependencies between services
type DependencyLink struct {
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Source DependencyLinkSource `json:"source"`
}

// ApplyDefaults applies defaults to the DependencyLink.
func (d DependencyLink) ApplyDefaults() DependencyLink {
if d.Source == "" {
d.Source = JaegerDependencyLinkSource
}
return d
}
30 changes: 30 additions & 0 deletions model/dependencies_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDependencyLinkApplyDefaults(t *testing.T) {
dl := DependencyLink{}.ApplyDefaults()
assert.Equal(t, JaegerDependencyLinkSource, dl.Source)

networkSource := DependencyLinkSource("network")
dl = DependencyLink{Source: networkSource}.ApplyDefaults()
assert.Equal(t, networkSource, dl.Source)
}
1 change: 1 addition & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Configuration struct {
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"`
TLS TLS
}

Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/cassandra/dependencystore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Dependency struct {
Parent string `cql:"parent"`
Child string `cql:"child"`
CallCount int64 `cql:"call_count"` // always unsigned, but we cannot explicitly read uint64 from Cassandra
Source string `cql:"source"`
}

// MarshalUDT handles marshalling a Dependency.
Expand All @@ -36,6 +37,8 @@ func (d *Dependency) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error
return gocql.Marshal(info, d.Child)
case "call_count":
return gocql.Marshal(info, d.CallCount)
case "source":
return gocql.Marshal(info, d.Source)
default:
return nil, fmt.Errorf("unknown column for position: %q", name)
}
Expand All @@ -50,6 +53,8 @@ func (d *Dependency) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte)
return gocql.Unmarshal(info, data, &d.Child)
case "call_count":
return gocql.Unmarshal(info, data, &d.CallCount)
case "source":
return gocql.Unmarshal(info, data, &d.Source)
default:
return fmt.Errorf("unknown column for position: %q", name)
}
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/cassandra/dependencystore/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ import (

func TestDependencyUDT(t *testing.T) {
dependency := &Dependency{
Parent: "goo",
Child: "gle",
Parent: "bi",
Child: "ng",
CallCount: 123,
Source: "jaeger",
}

testCase := testutils.UDTTestCase{
Obj: dependency,
New: func() gocql.UDTUnmarshaler { return &Dependency{} },
ObjName: "Dependency",
Fields: []testutils.UDTField{
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("goo"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("gle"), Err: false},
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("bi"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("ng"), Err: false},
{Name: "call_count", Type: gocql.TypeBigInt, ValIn: []byte{0, 0, 0, 0, 0, 0, 0, 123}, Err: false},
{Name: "source", Type: gocql.TypeAscii, ValIn: []byte("jaeger"), Err: false},
{Name: "wrong-field", Err: true},
},
}
Expand Down
72 changes: 64 additions & 8 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,59 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// Version determines which version of the dependencies table to use.
type Version int

// IsValid returns true if the Version is a valid one.
func (i Version) IsValid() bool {
return i >= 0 && i < versionEnumEnd
}

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
// V1 is used when the dependency table is SASI indexed.
V1 Version = iota

// V2 is used when the dependency table is NOT SASI indexed.
V2
versionEnumEnd

depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"

// TODO: Make this customizable.
tsBucket = 24 * time.Hour
)

var (
errInvalidVersion = errors.New("invalid version")
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
version Version
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
) *DependencyStore {
version Version,
) (*DependencyStore, error) {
if !version.IsValid() {
return nil, errInvalidVersion
}
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
}
version: version,
}, nil
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
Expand All @@ -59,27 +89,44 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
Parent: d.Parent,
Child: d.Child,
CallCount: int64(d.CallCount),
Source: string(d.Source),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)

var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsInsertStmtV1, ts, ts, deps)
case V2:
query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps)
}
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsSelectStmtV1, startTs, endTs)
case V2:
query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
var dependencies []Dependency
var ts time.Time
for iter.Scan(&ts, &dependencies) {
for _, dependency := range dependencies {
mDependency = append(mDependency, model.DependencyLink{
dl := model.DependencyLink{
Parent: dependency.Parent,
Child: dependency.Child,
CallCount: uint64(dependency.CallCount),
})
Source: model.DependencyLinkSource(dependency.Source),
}.ApplyDefaults()
mDependency = append(mDependency, dl)
}
}

Expand All @@ -89,3 +136,12 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
}
return mDependency, nil
}

func getBuckets(startTs time.Time, endTs time.Time) []time.Time {
// TODO: Preallocate the array using some maths and maybe use a pool? This endpoint probably isn't used enough to warrant this.
var tsBuckets []time.Time
for ts := startTs.Truncate(tsBucket); ts.Before(endTs); ts = ts.Add(tsBucket) {
tsBuckets = append(tsBuckets, ts)
}
return tsBuckets
}
Loading

0 comments on commit 7e51957

Please sign in to comment.