Skip to content

Commit

Permalink
Support archive storage in the query-service (#604)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 8, 2018
1 parent 8db7a11 commit 7f1b9a2
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 14 deletions.
34 changes: 31 additions & 3 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
istorage "github.com/jaegertracing/jaeger/storage"
)

func main() {
Expand Down Expand Up @@ -107,12 +108,16 @@ func main() {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

apiHandlerOptions := []app.HandlerOption{
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer),
}
apiHandlerOptions = append(apiHandlerOptions, archiveOptions(storageFactory, logger)...)
apiHandler := app.NewAPIHandler(
spanReader,
dependencyReader,
app.HandlerOptions.Prefix(queryOpts.Prefix),
app.HandlerOptions.Logger(logger),
app.HandlerOptions.Tracer(tracer))
apiHandlerOptions...)
r := mux.NewRouter()
apiHandler.RegisterRoutes(r)
registerStaticHandler(r, logger, queryOpts)
Expand Down Expand Up @@ -174,3 +179,26 @@ func registerStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *app.QueryOp
logger.Info("Static handler is not registered")
}
}

func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) []app.HandlerOption {
reader, err := storageFactory.CreateSpanReader()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return nil
}
writer, err := storageFactory.CreateSpanWriter()
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
return nil
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return nil
}
return []app.HandlerOption{
app.HandlerOptions.ArchiveSpanReader(reader),
app.HandlerOptions.ArchiveSpanWriter(writer),
}
}
39 changes: 36 additions & 3 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
primaryStorageConfig = "cassandra"
archiveStorageConfig = "cassandra-archive"
)

// Factory implements storage.Factory for Cassandra backend.
type Factory struct {
Options *Options
Expand All @@ -38,13 +44,14 @@ type Factory struct {

primaryConfig config.SessionBuilder
primarySession cassandra.Session
// archiveSession cassandra.Session TODO
archiveConfig config.SessionBuilder
archiveSession cassandra.Session
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: NewOptions("cassandra"), // TODO add "cassandra-archive" once supported
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}

Expand All @@ -57,6 +64,9 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
func (f *Factory) InitFromViper(v *viper.Viper) {
f.Options.InitFromViper(v)
f.primaryConfig = f.Options.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
}

// Initialize implements storage.Factory
Expand All @@ -68,7 +78,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return err
}
f.primarySession = primarySession
// TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604)

if f.archiveConfig != nil {
if archiveSession, err := f.archiveConfig.NewSession(); err == nil {
f.archiveSession = archiveSession
} else {
return err
}
}
return nil
}

Expand All @@ -86,3 +103,19 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, f.metricsFactory, f.logger), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.metricsFactory, f.logger), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger), nil
}
23 changes: 22 additions & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"

"github.com/jaegertracing/jaeger/storage"
)

var _ storage.Factory = new(Factory)
var _ storage.ArchiveFactory = new(Factory)

type mockSessionBuilder struct {
err error
Expand All @@ -44,7 +46,7 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) {
func TestCassandraFactory(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{})
command.ParseFlags([]string{"--cassandra-archive.enabled=true"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
Expand All @@ -53,6 +55,10 @@ func TestCassandraFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.primaryConfig = &mockSessionBuilder{}
f.archiveConfig = &mockSessionBuilder{err: errors.New("made-up error")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.archiveConfig = nil
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err := f.CreateSpanReader()
Expand All @@ -63,4 +69,19 @@ func TestCassandraFactory(t *testing.T) {

_, err = f.CreateDependencyReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanReader()
assert.EqualError(t, err, "Archive storage not configured")

_, err = f.CreateArchiveSpanWriter()
assert.EqualError(t, err, "Archive storage not configured")

f.archiveConfig = &mockSessionBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}
19 changes: 18 additions & 1 deletion plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
Expand Down Expand Up @@ -65,6 +66,8 @@ type namespaceConfig struct {
config.Configuration
servers string
namespace string
primary bool
Enabled bool
}

// NewOptions creates a new Options struct.
Expand All @@ -78,12 +81,14 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
EnableHostVerification: true,
},
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_local",
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
},
servers: "127.0.0.1",
namespace: primaryNamespace,
primary: true,
Enabled: true,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
SpanStoreWriteCacheTTL: time.Hour * 12,
Expand Down Expand Up @@ -112,6 +117,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
if !nsConfig.primary {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
false,
"Enable extra storage")
}
flagSet.Int(
nsConfig.namespace+suffixConnPerHost,
nsConfig.ConnectionsPerHost,
Expand Down Expand Up @@ -189,6 +200,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
}

func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
if !cfg.primary {
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
Expand Down Expand Up @@ -220,6 +234,9 @@ func (opt *Options) Get(namespace string) *config.Configuration {
nsCfg = &namespaceConfig{}
opt.others[namespace] = nsCfg
}
if !nsCfg.Enabled {
return nil
}
nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration)
if nsCfg.servers == "" {
nsCfg.servers = opt.primary.servers
Expand Down
19 changes: 14 additions & 5 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
)
Expand All @@ -31,13 +32,19 @@ func TestOptions(t *testing.T) {
assert.Equal(t, 2, primary.ConnectionsPerHost)

aux := opts.Get("archive")
assert.Nil(t, aux)

assert.NotNil(t, opts.others["archive"])
opts.others["archive"].Enabled = true
aux = opts.Get("archive")
require.NotNil(t, aux)
assert.Equal(t, primary.Keyspace, aux.Keyspace)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost)
}

func TestOptionsWithFlags(t *testing.T) {
opts := NewOptions("cas", "cas.aux")
opts := NewOptions("cas", "cas-aux")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--cas.keyspace=jaeger",
Expand All @@ -48,17 +55,19 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.port=4242",
"--cas.proto-version=3",
"--cas.socket-keep-alive=42s",
// a couple overrides
"--cas.aux.keyspace=jaeger-archive",
"--cas.aux.servers=3.3.3.3,4.4.4.4",
// enable aux with a couple overrides
"--cas-aux.enabled=true",
"--cas-aux.keyspace=jaeger-archive",
"--cas-aux.servers=3.3.3.3,4.4.4.4",
})
opts.InitFromViper(v)

primary := opts.GetPrimary()
assert.Equal(t, "jaeger", primary.Keyspace)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)

aux := opts.Get("cas.aux")
aux := opts.Get("cas-aux")
require.NotNil(t, aux)
assert.Equal(t, "jaeger-archive", aux.Keyspace)
assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers)
assert.Equal(t, 42, aux.ConnectionsPerHost)
Expand Down
26 changes: 26 additions & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,29 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}
}
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanReader()
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
factory, ok := f.factories[f.SpanStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
}
archive, ok := factory.(storage.ArchiveFactory)
if !ok {
return nil, storage.ErrArchiveStorageNotSupported
}
return archive.CreateArchiveSpanWriter()
}
Loading

0 comments on commit 7f1b9a2

Please sign in to comment.