diff --git a/CHANGELOG.md b/CHANGELOG.md index d29ddd3c2d2..804839603d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ Changes by Version #### Backend Changes +- Remove cassandra SASI indices [#1328](https://github.com/jaegertracing/jaeger/pull/1328) + +Migration Path: + +1. Run `plugin/storage/cassandra/schema/migration/v001tov002part1.sh` which will copy dependencies into a csv, update the `dependency UDT`, create a new `dependencies_v2` table, and write dependencies from the csv into the `dependencies_v2` table. +2. Run the collector and query services with the cassandra flag `cassandra.enable-dependencies-v2=true` which will instruct jaeger to write and read to and from the new `dependencies_v2` table. +3. Update [spark job](https://github.com/jaegertracing/spark-dependencies) to write to the new `dependencies_v2` table. +4. Run `plugin/storage/cassandra/schema/migration/v001tov002part2.sh` which will DELETE the old dependency table and the SASI index. + +Users who wish to continue to use the v1 table don't have to do anything as the cassandra flag `cassandra.enable-dependencies-v2` will default to false. Users may migrate on their own timeline however new features will be built solely on the `dependencies_v2` table. In the future, we will remove support for v1 completely. + ##### Breaking Changes ##### New Features diff --git a/model/dependencies.go b/model/dependencies.go index c3e5ee56c76..7a33bf45201 100644 --- a/model/dependencies.go +++ b/model/dependencies.go @@ -14,9 +14,26 @@ package model +// DependencyLinkSource is the source of data used to generate the dependencies. +type DependencyLinkSource string + +const ( + // JaegerDependencyLinkSource describes a dependency diagram that was generated from Jaeger traces. + JaegerDependencyLinkSource = DependencyLinkSource("jaeger") +) + // DependencyLink shows dependencies between services type DependencyLink struct { - Parent string `json:"parent"` - Child string `json:"child"` - CallCount uint64 `json:"callCount"` + Parent string `json:"parent"` + Child string `json:"child"` + CallCount uint64 `json:"callCount"` + Source DependencyLinkSource `json:"source"` +} + +// ApplyDefaults applies defaults to the DependencyLink. +func (d DependencyLink) ApplyDefaults() DependencyLink { + if d.Source == "" { + d.Source = JaegerDependencyLinkSource + } + return d } diff --git a/model/dependencies_test.go b/model/dependencies_test.go new file mode 100644 index 00000000000..5ec933e7a0f --- /dev/null +++ b/model/dependencies_test.go @@ -0,0 +1,30 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDependencyLinkApplyDefaults(t *testing.T) { + dl := DependencyLink{}.ApplyDefaults() + assert.Equal(t, JaegerDependencyLinkSource, dl.Source) + + networkSource := DependencyLinkSource("network") + dl = DependencyLink{Source: networkSource}.ApplyDefaults() + assert.Equal(t, networkSource, dl.Source) +} diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 5dd12bad634..c736b1d0ee5 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -40,6 +40,7 @@ type Configuration struct { Port int `yaml:"port"` Authenticator Authenticator `yaml:"authenticator"` DisableAutoDiscovery bool `yaml:"disable_auto_discovery"` + EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"` TLS TLS } diff --git a/plugin/storage/cassandra/dependencystore/model.go b/plugin/storage/cassandra/dependencystore/model.go index e2c5cee043e..a92e6824e8f 100644 --- a/plugin/storage/cassandra/dependencystore/model.go +++ b/plugin/storage/cassandra/dependencystore/model.go @@ -25,6 +25,7 @@ type Dependency struct { Parent string `cql:"parent"` Child string `cql:"child"` CallCount int64 `cql:"call_count"` // always unsigned, but we cannot explicitly read uint64 from Cassandra + Source string `cql:"source"` } // MarshalUDT handles marshalling a Dependency. @@ -36,6 +37,8 @@ func (d *Dependency) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error return gocql.Marshal(info, d.Child) case "call_count": return gocql.Marshal(info, d.CallCount) + case "source": + return gocql.Marshal(info, d.Source) default: return nil, fmt.Errorf("unknown column for position: %q", name) } @@ -50,6 +53,8 @@ func (d *Dependency) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) return gocql.Unmarshal(info, data, &d.Child) case "call_count": return gocql.Unmarshal(info, data, &d.CallCount) + case "source": + return gocql.Unmarshal(info, data, &d.Source) default: return fmt.Errorf("unknown column for position: %q", name) } diff --git a/plugin/storage/cassandra/dependencystore/model_test.go b/plugin/storage/cassandra/dependencystore/model_test.go index d86b9e74809..70a8d6af48a 100644 --- a/plugin/storage/cassandra/dependencystore/model_test.go +++ b/plugin/storage/cassandra/dependencystore/model_test.go @@ -24,9 +24,10 @@ import ( func TestDependencyUDT(t *testing.T) { dependency := &Dependency{ - Parent: "goo", - Child: "gle", + Parent: "bi", + Child: "ng", CallCount: 123, + Source: "jaeger", } testCase := testutils.UDTTestCase{ @@ -34,9 +35,10 @@ func TestDependencyUDT(t *testing.T) { New: func() gocql.UDTUnmarshaler { return &Dependency{} }, ObjName: "Dependency", Fields: []testutils.UDTField{ - {Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("goo"), Err: false}, - {Name: "child", Type: gocql.TypeAscii, ValIn: []byte("gle"), Err: false}, + {Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("bi"), Err: false}, + {Name: "child", Type: gocql.TypeAscii, ValIn: []byte("ng"), Err: false}, {Name: "call_count", Type: gocql.TypeBigInt, ValIn: []byte{0, 0, 0, 0, 0, 0, 0, 123}, Err: false}, + {Name: "source", Type: gocql.TypeAscii, ValIn: []byte("jaeger"), Err: false}, {Name: "wrong-field", Err: true}, }, } diff --git a/plugin/storage/cassandra/dependencystore/storage.go b/plugin/storage/cassandra/dependencystore/storage.go index 64b0e35ba50..de95ff3ceea 100644 --- a/plugin/storage/cassandra/dependencystore/storage.go +++ b/plugin/storage/cassandra/dependencystore/storage.go @@ -26,9 +26,33 @@ import ( casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics" ) +// Version determines which version of the dependencies table to use. +type Version int + +// IsValid returns true if the Version is a valid one. +func (i Version) IsValid() bool { + return i >= 0 && i < versionEnumEnd +} + const ( - depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)" - depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?" + // V1 is used when the dependency table is SASI indexed. + V1 Version = iota + + // V2 is used when the dependency table is NOT SASI indexed. + V2 + versionEnumEnd + + depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)" + depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)" + depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?" + depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?" + + // TODO: Make this customizable. + tsBucket = 24 * time.Hour +) + +var ( + errInvalidVersion = errors.New("invalid version") ) // DependencyStore handles all queries and insertions to Cassandra dependencies @@ -36,6 +60,7 @@ type DependencyStore struct { session cassandra.Session dependenciesTableMetrics *casMetrics.Table logger *zap.Logger + version Version } // NewDependencyStore returns a DependencyStore @@ -43,12 +68,17 @@ func NewDependencyStore( session cassandra.Session, metricsFactory metrics.Factory, logger *zap.Logger, -) *DependencyStore { + version Version, +) (*DependencyStore, error) { + if !version.IsValid() { + return nil, errInvalidVersion + } return &DependencyStore{ session: session, dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"), logger: logger, - } + version: version, + }, nil } // WriteDependencies implements dependencystore.Writer#WriteDependencies. @@ -59,15 +89,30 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D Parent: d.Parent, Child: d.Child, CallCount: int64(d.CallCount), + Source: string(d.Source), } } - query := s.session.Query(depsInsertStmt, ts, ts, deps) + + var query cassandra.Query + switch s.version { + case V1: + query = s.session.Query(depsInsertStmtV1, ts, ts, deps) + case V2: + query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps) + } return s.dependenciesTableMetrics.Exec(query, s.logger) } // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs) + startTs := endTs.Add(-1 * lookback) + var query cassandra.Query + switch s.version { + case V1: + query = s.session.Query(depsSelectStmtV1, startTs, endTs) + case V2: + query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs) + } iter := query.Consistency(cassandra.One).Iter() var mDependency []model.DependencyLink @@ -75,11 +120,13 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio var ts time.Time for iter.Scan(&ts, &dependencies) { for _, dependency := range dependencies { - mDependency = append(mDependency, model.DependencyLink{ + dl := model.DependencyLink{ Parent: dependency.Parent, Child: dependency.Child, CallCount: uint64(dependency.CallCount), - }) + Source: model.DependencyLinkSource(dependency.Source), + }.ApplyDefaults() + mDependency = append(mDependency, dl) } } @@ -89,3 +136,12 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio } return mDependency, nil } + +func getBuckets(startTs time.Time, endTs time.Time) []time.Time { + // TODO: Preallocate the array using some maths and maybe use a pool? This endpoint probably isn't used enough to warrant this. + var tsBuckets []time.Time + for ts := startTs.Truncate(tsBucket); ts.Before(endTs); ts = ts.Add(tsBucket) { + tsBuckets = append(tsBuckets, ts) + } + return tsBuckets +} diff --git a/plugin/storage/cassandra/dependencystore/storage_test.go b/plugin/storage/cassandra/dependencystore/storage_test.go index 48eaae7bc6d..df185a50d2b 100644 --- a/plugin/storage/cassandra/dependencystore/storage_test.go +++ b/plugin/storage/cassandra/dependencystore/storage_test.go @@ -22,8 +22,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "go.uber.org/zap" + "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cassandra" @@ -39,16 +40,17 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStore(fn func(s *depStorageTest)) { +func withDepStore(version Version, fn func(s *depStorageTest)) { session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(time.Second) defer metricsFactory.Stop() + store, _ := NewDependencyStore(session, metricsFactory, logger, version) s := &depStorageTest{ session: session, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(session, metricsFactory, logger), + storage: store, } fn(s) } @@ -56,53 +58,92 @@ func withDepStore(fn func(s *depStorageTest)) { var _ dependencystore.Reader = &DependencyStore{} // check API conformance var _ dependencystore.Writer = &DependencyStore{} // check API conformance +func TestVersionIsValid(t *testing.T) { + assert.True(t, V1.IsValid()) + assert.True(t, V2.IsValid()) + assert.False(t, versionEnumEnd.IsValid()) +} + +func TestInvalidVersion(t *testing.T) { + _, err := NewDependencyStore(&mocks.Session{}, metrics.NullFactory, zap.NewNop(), versionEnumEnd) + assert.Error(t, err) +} + func TestDependencyStoreWrite(t *testing.T) { - withDepStore(func(s *depStorageTest) { - query := &mocks.Query{} - query.On("Exec").Return(nil) - - var args []interface{} - captureArgs := mock.MatchedBy(func(v []interface{}) bool { - args = v - return true - }) + testCases := []struct { + caption string + version Version + }{ + { + caption: "V1", + version: V1, + }, + { + caption: "V2", + version: V2, + }, + } + for _, tc := range testCases { + testCase := tc // capture loop var + t.Run(testCase.caption, func(t *testing.T) { + withDepStore(testCase.version, func(s *depStorageTest) { + query := &mocks.Query{} + query.On("Exec").Return(nil) - s.session.On("Query", mock.AnythingOfType("string"), captureArgs).Return(query) + var args []interface{} + captureArgs := mock.MatchedBy(func(v []interface{}) bool { + args = v + return true + }) - ts := time.Date(2017, time.January, 24, 11, 15, 17, 12345, time.UTC) - dependencies := []model.DependencyLink{ - { - Parent: "a", - Child: "b", - CallCount: 42, - }, - } - err := s.storage.WriteDependencies(ts, dependencies) - assert.NoError(t, err) - - assert.Len(t, args, 3) - if d, ok := args[0].(time.Time); ok { - assert.Equal(t, ts, d) - } else { - assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args) - } - if d, ok := args[1].(time.Time); ok { - assert.Equal(t, ts, d) - } else { - assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args) - } - if d, ok := args[2].([]Dependency); ok { - assert.Equal(t, []Dependency{ - { - Parent: "a", - Child: "b", - CallCount: 42, - }, - }, d) - } else { - assert.Fail(t, "expecting third arg as []Dependency", "received: %+v", args) - } - }) + s.session.On("Query", mock.AnythingOfType("string"), captureArgs).Return(query) + + ts := time.Date(2017, time.January, 24, 11, 15, 17, 12345, time.UTC) + dependencies := []model.DependencyLink{ + { + Parent: "a", + Child: "b", + CallCount: 42, + Source: model.JaegerDependencyLinkSource, + }, + } + err := s.storage.WriteDependencies(ts, dependencies) + assert.NoError(t, err) + + assert.Len(t, args, 3) + if d, ok := args[0].(time.Time); ok { + assert.Equal(t, ts, d) + } else { + assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args) + } + if testCase.version == V2 { + if d, ok := args[1].(time.Time); ok { + assert.Equal(t, time.Date(2017, time.January, 24, 0, 0, 0, 0, time.UTC), d) + } else { + assert.Fail(t, "expecting second arg as time", "received: %+v", args) + } + } else { + if d, ok := args[1].(time.Time); ok { + assert.Equal(t, ts, d) + } else { + assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args) + } + } + if d, ok := args[2].([]Dependency); ok { + assert.Equal(t, []Dependency{ + { + Parent: "a", + Child: "b", + CallCount: 42, + Source: "jaeger", + }, + }, d) + } else { + assert.Fail(t, "expecting third arg as []Dependency", "received: %+v", args) + } + }) + }) + } } func TestDependencyStoreGetDependencies(t *testing.T) { @@ -111,23 +152,39 @@ func TestDependencyStoreGetDependencies(t *testing.T) { queryError error expectedError string expectedLogs []string + version Version }{ { - caption: "success", + caption: "success V1", + version: V1, + }, + { + caption: "success V2", + version: V2, }, { - caption: "failure", + caption: "failure V1", queryError: errors.New("query error"), expectedError: "Error reading dependencies from storage: query error", expectedLogs: []string{ "Failure to read Dependencies", }, + version: V1, + }, + { + caption: "failure V2", + queryError: errors.New("query error"), + expectedError: "Error reading dependencies from storage: query error", + expectedLogs: []string{ + "Failure to read Dependencies", + }, + version: V2, }, } for _, tc := range testCases { testCase := tc // capture loop var t.Run(testCase.caption, func(t *testing.T) { - withDepStore(func(s *depStorageTest) { + withDepStore(testCase.version, func(s *depStorageTest) { scanMatcher := func() interface{} { deps := [][]Dependency{ { @@ -172,10 +229,10 @@ func TestDependencyStoreGetDependencies(t *testing.T) { if testCase.expectedError == "" { assert.NoError(t, err) expected := []model.DependencyLink{ - {Parent: "a", Child: "b", CallCount: 1}, - {Parent: "b", Child: "c", CallCount: 1}, - {Parent: "a", Child: "b", CallCount: 1}, - {Parent: "b", Child: "c", CallCount: 1}, + {Parent: "a", Child: "b", CallCount: 1, Source: model.JaegerDependencyLinkSource}, + {Parent: "b", Child: "c", CallCount: 1, Source: model.JaegerDependencyLinkSource}, + {Parent: "a", Child: "b", CallCount: 1, Source: model.JaegerDependencyLinkSource}, + {Parent: "b", Child: "c", CallCount: 1, Source: model.JaegerDependencyLinkSource}, } assert.Equal(t, expected, deps) } else { @@ -192,6 +249,19 @@ func TestDependencyStoreGetDependencies(t *testing.T) { } } +func TestGetBuckets(t *testing.T) { + var ( + start = time.Date(2017, time.January, 24, 11, 15, 17, 12345, time.UTC) + end = time.Date(2017, time.January, 26, 11, 15, 17, 12345, time.UTC) + expected = []time.Time{ + time.Date(2017, time.January, 24, 0, 0, 0, 0, time.UTC), + time.Date(2017, time.January, 25, 0, 0, 0, 0, time.UTC), + time.Date(2017, time.January, 26, 0, 0, 0, 0, time.UTC), + } + ) + assert.Equal(t, expected, getBuckets(start, end)) +} + func matchEverything() interface{} { return mock.MatchedBy(func(v []interface{}) bool { return true }) } diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 8c5d9862a23..372b06a558c 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -106,7 +106,11 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger), nil + version := cDepStore.V1 + if f.Options.GetPrimary().EnableDependenciesV2 { + version = cDepStore.V2 + } + return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version) } // CreateArchiveSpanReader implements storage.ArchiveFactory diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 078c008cf71..d0c7bcc60b7 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -47,7 +47,7 @@ func TestCassandraFactory(t *testing.T) { logger, logBuf := testutils.NewLogger() f := NewFactory() v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) + command.ParseFlags([]string{"--cassandra-archive.enabled=true", "--cassandra.enable-dependencies-v2=true"}) f.InitFromViper(v) // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 461112b3874..66a850d8a70 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -26,26 +26,27 @@ import ( const ( // session settings - suffixEnabled = ".enabled" - suffixConnPerHost = ".connections-per-host" - suffixMaxRetryAttempts = ".max-retry-attempts" - suffixTimeout = ".timeout" - suffixReconnectInterval = ".reconnect-interval" - suffixServers = ".servers" - suffixPort = ".port" - suffixKeyspace = ".keyspace" - suffixDC = ".local-dc" - suffixConsistency = ".consistency" - suffixProtoVer = ".proto-version" - suffixSocketKeepAlive = ".socket-keep-alive" - suffixUsername = ".username" - suffixPassword = ".password" - suffixTLS = ".tls" - suffixCert = ".tls.cert" - suffixKey = ".tls.key" - suffixCA = ".tls.ca" - suffixServerName = ".tls.server-name" - suffixVerifyHost = ".tls.verify-host" + suffixEnabled = ".enabled" + suffixConnPerHost = ".connections-per-host" + suffixMaxRetryAttempts = ".max-retry-attempts" + suffixTimeout = ".timeout" + suffixReconnectInterval = ".reconnect-interval" + suffixServers = ".servers" + suffixPort = ".port" + suffixKeyspace = ".keyspace" + suffixDC = ".local-dc" + suffixConsistency = ".consistency" + suffixProtoVer = ".proto-version" + suffixSocketKeepAlive = ".socket-keep-alive" + suffixUsername = ".username" + suffixPassword = ".password" + suffixTLS = ".tls" + suffixCert = ".tls.cert" + suffixKey = ".tls.key" + suffixCA = ".tls.ca" + suffixServerName = ".tls.server-name" + suffixVerifyHost = ".tls.verify-host" + suffixEnableDependenciesV2 = ".enable-dependencies-v2" // common storage settings suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl" @@ -197,6 +198,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixVerifyHost, nsConfig.TLS.EnableHostVerification, "Enable (or disable) host key verification") + flagSet.Bool( + nsConfig.namespace+suffixEnableDependenciesV2, + nsConfig.EnableDependenciesV2, + "Disable (or enable) the dependencies v2 table. Only set this to true if you've migrated the dependencies table to v2") } // InitFromViper initializes Options with properties from viper @@ -231,6 +236,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA) cfg.TLS.ServerName = v.GetString(cfg.namespace + suffixServerName) cfg.TLS.EnableHostVerification = v.GetBool(cfg.namespace + suffixVerifyHost) + cfg.EnableDependenciesV2 = v.GetBool(cfg.namespace + suffixEnableDependenciesV2) } // GetPrimary returns primary configuration. diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index d0c84d4e864..57680f850ce 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -63,6 +63,7 @@ func TestOptionsWithFlags(t *testing.T) { "--cas-aux.enabled=true", "--cas-aux.keyspace=jaeger-archive", "--cas-aux.servers=3.3.3.3, 4.4.4.4", + "--cas-aux.enable-dependencies-v2=true", }) opts.InitFromViper(v) @@ -71,6 +72,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "mojave", primary.LocalDC) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, "ONE", primary.Consistency) + assert.Equal(t, false, primary.EnableDependenciesV2) aux := opts.Get("cas-aux") require.NotNil(t, aux) @@ -84,4 +86,5 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") assert.Equal(t, 3, aux.ProtoVersion) assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) + assert.Equal(t, true, aux.EnableDependenciesV2) } diff --git a/plugin/storage/cassandra/schema/migration/v001tov002part1.sh b/plugin/storage/cassandra/schema/migration/v001tov002part1.sh new file mode 100755 index 00000000000..7d3ab108ace --- /dev/null +++ b/plugin/storage/cassandra/schema/migration/v001tov002part1.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash + +set -euo pipefail + +function usage { + >&2 echo "Error: $1" + >&2 echo "" + >&2 echo "Usage: KEYSPACE={keyspace} $0" + >&2 echo "" + >&2 echo "The following parameters can be set via environment:" + >&2 echo " KEYSPACE - keyspace" + >&2 echo " TIMEOUT - cqlsh request timeout" + >&2 echo "" + exit 1 +} + +confirm() { + read -r -p "${1:-Are you sure? [y/N]} " response + case "$response" in + [yY][eE][sS]|[yY]) + true + ;; + *) + exit 1 + ;; + esac +} + +keyspace=${KEYSPACE} +timeout=${TIMEOUT:-"60"} +cqlsh_cmd="cqlsh --request-timeout=$timeout" + +if [[ ${keyspace} == "" ]]; then + usage "missing KEYSPACE parameter" +fi + +if [[ ${keyspace} =~ [^a-zA-Z0-9_] ]]; then + usage "invalid characters in KEYSPACE=$keyspace parameter, please use letters, digits or underscores" +fi + +row_count=$($cqlsh_cmd -e "select count(*) from $keyspace.dependencies;"|head -4|tail -1| tr -d ' ') + +echo "About to copy $row_count rows." +confirm + +$cqlsh_cmd -e "COPY $keyspace.dependencies (ts, dependencies) to 'dependencies.csv';" + +if [ ! -f dependencies.csv ]; then + echo "Could not find dependencies.csv. Backup from cassandra was probably not successful" + exit 1 +fi + +if [ ${row_count} -ne $(wc -l dependencies.csv | cut -f 1 -d ' ') ]; then + echo "Number of rows in file is not equal to number of rows in cassandra" + exit 1 +fi + +while IFS="," read ts dependency; do + bucket=`date +"%Y-%m-%d%z" -d "$ts"` + echo "$bucket,$ts,$dependency" +done < dependencies.csv > dependencies_datebucket.csv + +dependencies_ttl=$($cqlsh_cmd -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='dependencies';"|head -4|tail -1|tr -d ' ') + +echo "Setting dependencies_ttl to $dependencies_ttl" + +$cqlsh_cmd -e "ALTER TYPE $keyspace.dependency ADD source text;" + +$cqlsh_cmd -e "CREATE TABLE $keyspace.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = $dependencies_ttl; +" + +$cqlsh_cmd -e "COPY $keyspace.dependencies_v2 (ts_bucket, ts, dependencies) FROM 'dependencies_datebucket.csv';" diff --git a/plugin/storage/cassandra/schema/migration/v001tov002part2.sh b/plugin/storage/cassandra/schema/migration/v001tov002part2.sh new file mode 100755 index 00000000000..23933f82c51 --- /dev/null +++ b/plugin/storage/cassandra/schema/migration/v001tov002part2.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +set -euo pipefail + +function usage { + >&2 echo "Error: $1" + >&2 echo "" + >&2 echo "Usage: KEYSPACE={keyspace} $0" + >&2 echo "" + >&2 echo "The following parameters can be set via environment:" + >&2 echo " KEYSPACE - keyspace" + >&2 echo " TIMEOUT - cqlsh request timeout" + >&2 echo "" + exit 1 +} + +confirm() { + read -r -p "${1:-Are you sure? [y/N]} " response + case "$response" in + [yY][eE][sS]|[yY]) + true + ;; + *) + exit 1 + ;; + esac +} + +keyspace=${KEYSPACE} +timeout=${TIMEOUT} +cqlsh_cmd=cqlsh --request-timeout=$timeout + +if [[ ${keyspace} == "" ]]; then + usage "missing KEYSPACE parameter" +fi + +if [[ ${keyspace} =~ [^a-zA-Z0-9_] ]]; then + usage "invalid characters in KEYSPACE=$keyspace parameter, please use letters, digits or underscores" +fi + + +row_count=$($cqlsh_cmd -e "select count(*) from $keyspace.dependencies;"|head -4|tail -1| tr -d ' ') + +echo "About to delete $row_count rows." +confirm + +$cqlsh_cmd -e "DROP INDEX IF EXISTS $keyspace.ts_index;" +$cqlsh_cmd -e "DROP TABLE IF EXISTS $keyspace.dependencies;" diff --git a/plugin/storage/cassandra/schema/v002.cql.tmpl b/plugin/storage/cassandra/schema/v002.cql.tmpl new file mode 100644 index 00000000000..6bb0234bad2 --- /dev/null +++ b/plugin/storage/cassandra/schema/v002.cql.tmpl @@ -0,0 +1,203 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, + duration bigint, + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names ( + service_name text, + operation_name text, + PRIMARY KEY ((service_name), operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index a3e2f8dd878..aaeb8f10c80 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -17,12 +17,15 @@ package integration import ( "os" "testing" + "time" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" @@ -39,27 +42,66 @@ type CassandraStorageIntegration struct { logger *zap.Logger } -func (s *CassandraStorageIntegration) initializeCassandra() error { +func newCassandraStorageIntegration() *CassandraStorageIntegration { + return &CassandraStorageIntegration{ + StorageIntegration: StorageIntegration{ + Refresh: func() error { return nil }, + CleanUp: func() error { return nil }, + }, + } +} + +func (s *CassandraStorageIntegration) initializeCassandraFactory(flags []string) (*cassandra.Factory, error) { s.logger, _ = testutils.NewLogger() - var ( - f = cassandra.NewFactory() - ) + f := cassandra.NewFactory() v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{ - "--cassandra.keyspace=jaeger_v1_dc1", - }) + command.ParseFlags(flags) f.InitFromViper(v) if err := f.Initialize(metrics.NullFactory, s.logger); err != nil { + return nil, err + } + return f, nil +} + +func (s *CassandraStorageIntegration) initializeCassandra() error { + f, err := s.initializeCassandraFactory([]string{ + "--cassandra.keyspace=jaeger_v1_dc1", + }) + if err != nil { return err } - var err error - var ok bool if s.SpanWriter, err = f.CreateSpanWriter(); err != nil { return err } if s.SpanReader, err = f.CreateSpanReader(); err != nil { return err } + if err = s.initializeDependencyReaderAndWriter(f); err != nil { + return err + } + return nil +} + +func (s *CassandraStorageIntegration) initializeCassandraDependenciesV2() error { + f, err := s.initializeCassandraFactory([]string{ + "--cassandra.keyspace=jaeger_v1_dc1", + "--cassandra.enable-dependencies-v2=true", + "--cassandra.port=9043", + }) + if err != nil { + return err + } + if err = s.initializeDependencyReaderAndWriter(f); err != nil { + return err + } + return nil +} + +func (s *CassandraStorageIntegration) initializeDependencyReaderAndWriter(f *cassandra.Factory) error { + var ( + err error + ok bool + ) if s.DependencyReader, err = f.CreateDependencyReader(); err != nil { return err } @@ -67,17 +109,44 @@ func (s *CassandraStorageIntegration) initializeCassandra() error { if s.DependencyWriter, ok = s.DependencyReader.(dependencystore.Writer); !ok { return errInitializeCassandraDependencyWriter } - s.Refresh = func() error { return nil } - s.CleanUp = func() error { return nil } return nil } +// TODO: Only the cassandra storage currently returns the `Source` field. Once +// all others support the field, we can remove this test and use the existing testGetDependencies. +func (s *StorageIntegration) testCassandraGetDependencies(t *testing.T) { + defer s.cleanUp(t) + + expected := []model.DependencyLink{ + { + Parent: "hello", + Child: "world", + CallCount: uint64(1), + Source: model.JaegerDependencyLinkSource, + }, + { + Parent: "world", + Child: "hello", + CallCount: uint64(3), + Source: model.JaegerDependencyLinkSource, + }, + } + require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected)) + s.refresh(t) + actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute) + assert.NoError(t, err) + assert.EqualValues(t, expected, actual) +} + func TestCassandraStorage(t *testing.T) { if os.Getenv("STORAGE") != "cassandra" { t.Skip("Integration test against Cassandra skipped; set STORAGE env var to cassandra to run this") } - s := &CassandraStorageIntegration{} - require.NoError(t, s.initializeCassandra()) + s1 := newCassandraStorageIntegration() + s2 := newCassandraStorageIntegration() + require.NoError(t, s1.initializeCassandra()) + require.NoError(t, s2.initializeCassandraDependenciesV2()) // TODO: Support all other tests. - t.Run("GetDependencies", s.testGetDependencies) + t.Run("GetDependencies", s1.testCassandraGetDependencies) + t.Run("GetDependenciesV2", s2.testCassandraGetDependencies) } diff --git a/scripts/travis/cassandra-integration-test.sh b/scripts/travis/cassandra-integration-test.sh index d24a4cb714d..2cfbe55e69e 100755 --- a/scripts/travis/cassandra-integration-test.sh +++ b/scripts/travis/cassandra-integration-test.sh @@ -3,24 +3,28 @@ set -e # Clean up before starting. -docker rm cassandra || true +docker rm -f cassandra || true +docker rm -f cassandra2 || true docker network rm integration_test || true -# Create a network so that the schema container can communicate with the cassandra container. +# Create a network so that the schema container can communicate with the cassandra containers. docker network create integration_test -# Start a cassandra container whose ports are exposed to localhost to facilitate testing. -CID=$(docker run -d --name cassandra --network integration_test -p 9042:9042 -p 9160:9160 cassandra:3.9) +# Start cassandra containers whose ports are exposed to localhost to facilitate testing. +docker run -d --name cassandra --network integration_test -p 9042:9042 -p 9160:9160 cassandra:3.9 +docker run -d --name cassandra2 --network integration_test -p 9043:9042 -p 9161:9160 cassandra:3.9 # Build the schema container and run it rather than using the existing container in Docker Hub since that # requires this current build to succeed before this test can use it; chicken and egg problem. docker build -t jaeger-cassandra-schema-integration-test plugin/storage/cassandra/ -docker run --network integration_test -e TEMPLATE=/cassandra-schema/v001.cql.tmpl jaeger-cassandra-schema-integration-test +docker run --network integration_test -e CQLSH_HOST=cassandra -e TEMPLATE=/cassandra-schema/v001.cql.tmpl jaeger-cassandra-schema-integration-test +docker run --network integration_test -e CQLSH_HOST=cassandra2 -e TEMPLATE=/cassandra-schema/v002.cql.tmpl jaeger-cassandra-schema-integration-test # Run the test. export STORAGE=cassandra make storage-integration-test # Tear down after. -docker kill $CID +docker rm -f cassandra +docker rm -f cassandra2 docker network rm integration_test