Skip to content

Commit

Permalink
Embeded template file in binary and added query construction and exec…
Browse files Browse the repository at this point in the history
…ution for initialize database

Signed-off-by: Alok Kumar Singh <[email protected]>
  • Loading branch information
akstron committed Oct 28, 2024
1 parent 9f4e8f7 commit 90368b1
Show file tree
Hide file tree
Showing 7 changed files with 548 additions and 1 deletion.
12 changes: 12 additions & 0 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
datacenter: "test"
trace_ttl: 172800
dependencies_ttl: 172800
replication_factor: 1
cas_version: 4
compaction_window: "1m"
connection:
auth:
basic:
Expand All @@ -35,6 +41,12 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
datacenter: "test"
trace_ttl: 172800
dependencies_ttl: 172800
replication_factor: 1
cas_version: 4
compaction_window: "1m"
connection:
auth:
basic:
Expand Down
16 changes: 15 additions & 1 deletion pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,26 @@ type Connection struct {
ProtoVersion int `mapstructure:"proto_version"`
}

type SchemaConfig struct {
Datacenter string `mapstructure:"datacenter" valid:"optional"`
TraceTTL int `mapstructure:"trace_ttl" valid:"optional"`
DependenciesTTL int `mapstructure:"dependencies_ttl" valid:"optional"`
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
CasVersion int `mapstructure:"cas_version" valid:"optional"`
CompactionWindow string `mapstructure:"compaction_window" valid:"optional"`
Replication string `mapstructure:"replication" valid:"optional"`
CompactionWindowSize int `mapstructure:"compaction_window_size" valid:"optional"`
CompactionWindowUnit string `mapstructure:"compaction_window_unit" valid:"optional"`
}

type Schema struct {
// Keyspace contains the namespace where Jaeger data will be stored.
Keyspace string `mapstructure:"keyspace"`
// DisableCompression, if set to true, disables the use of the default Snappy Compression
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
SchemaConfig
}

type Query struct {
Expand Down Expand Up @@ -150,7 +163,8 @@ func (c *Configuration) NewSession() (cassandra.Session, error) {
// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
cluster.Keyspace = c.Schema.Keyspace
// Removing this, since keyspace would be created post builing connection
// cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
Expand Down
14 changes: 14 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -131,6 +132,10 @@ func (f *Factory) configureFromOptions(o *Options) {
}
}

func (f *Factory) initializeDB(session cassandra.Session, cfg *config.Schema) error {
return schema.GenerateSchemaIfNotPresent(session, cfg)
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
Expand All @@ -143,12 +148,21 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.primarySession = primarySession

// After creating a session, execute commands to initialize the setup if not already present
if err := f.initializeDB(primarySession, &f.Options.Primary.Schema); err != nil {
return err
}

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
if err != nil {
return err
}
f.archiveSession = archiveSession

if err := f.initializeDB(archiveSession, &f.Options.Primary.Schema); err != nil {
return err
}
} else {
logger.Info("Cassandra archive storage configuration is empty, skipping")
}
Expand Down
201 changes: 201 additions & 0 deletions plugin/storage/cassandra/schema/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package schema

import (
"bytes"
"embed"
"fmt"
"regexp"
"strconv"
"text/template"

"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
)

//go:embed v004-go-tmpl.cql.tmpl
//go:embed v004-go-tmpl-test.cql.tmpl
var schemaFile embed.FS

func DefaultSchemaConfig() config.Schema {
return config.Schema{
Keyspace: "jaeger_v2_test",
SchemaConfig: config.SchemaConfig{
Datacenter: "test",
TraceTTL: 172800,
DependenciesTTL: 0,
ReplicationFactor: 1,
CasVersion: 4,
},
}
}

func applyDefaults(cfg *config.Schema) {
defaultSchema := DefaultSchemaConfig()

if cfg.Keyspace == "" {
cfg.Keyspace = defaultSchema.Keyspace
}

if cfg.Datacenter == "" {
cfg.Datacenter = defaultSchema.Datacenter
}

if cfg.TraceTTL == 0 {
cfg.TraceTTL = defaultSchema.TraceTTL
}

if cfg.ReplicationFactor == 0 {
cfg.ReplicationFactor = defaultSchema.ReplicationFactor
}

if cfg.CasVersion == 0 {
cfg.CasVersion = 4
}
}

// Applies defaults for the configs and contructs other optional parameters from it
func constructCompleteSchemaConfig(cfg *config.Schema) error {
applyDefaults(cfg)

cfg.Replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", cfg.Datacenter, cfg.ReplicationFactor)

if cfg.CompactionWindow != "" {
isMatch, err := regexp.MatchString("^[0-9]+[mhd]$", cfg.CompactionWindow)
if err != nil {
return err
}

if !isMatch {
return fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days")
}

cfg.CompactionWindowSize, err = strconv.Atoi(cfg.CompactionWindow[:len(cfg.CompactionWindow)-1])
if err != nil {
return err
}

cfg.CompactionWindowUnit = cfg.CompactionWindow[len(cfg.CompactionWindow)-1:]
} else {
traceTTLMinutes := cfg.TraceTTL / 60

cfg.CompactionWindowSize = (traceTTLMinutes + 30 - 1) / 30
cfg.CompactionWindowUnit = "m"
}

switch cfg.CompactionWindowUnit {
case `m`:
cfg.CompactionWindowUnit = `MINUTES`
case `h`:
cfg.CompactionWindowUnit = `HOURS`
case `d`:
cfg.CompactionWindowUnit = `DAYS`
default:
return fmt.Errorf("Invalid compaction window unit. If can be among {m|h|d}")
}

return nil
}

func getQueryFileAsBytes(fileName string, cfg *config.Schema) ([]byte, error) {
tmpl, err := template.ParseFS(schemaFile, fileName)
if err != nil {
return nil, err
}

var result bytes.Buffer
err = tmpl.Execute(&result, cfg)
if err != nil {
return nil, err
}

return result.Bytes(), nil
}

func getQueriesFromBytes(queryFile []byte) ([]string, error) {
lines := bytes.Split(queryFile, []byte("\n"))

var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.Index(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

if len(line) == 0 {
continue
}

extractedLines = append(extractedLines, bytes.TrimSpace(line))
}

var queries []string

// Construct individual queries strings
var queryString string
for _, line := range extractedLines {
queryString += string(line)
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, queryString)
queryString = ""
}
}

if len(queryString) > 0 {
return nil, fmt.Errorf(`Invalid template`)
}

return queries, nil
}

func getCassandraQueriesFromQueryStrings(session cassandra.Session, queries []string) []cassandra.Query {
var casQueries []cassandra.Query

for _, query := range queries {
casQueries = append(casQueries, session.Query(query))
}

return casQueries
}

func contructSchemaQueries(session cassandra.Session, cfg *config.Schema) ([]cassandra.Query, error) {
err := constructCompleteSchemaConfig(cfg)
if err != nil {
return nil, err
}

queryFile, err := getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, cfg)
if err != nil {
return nil, err
}

queryStrings, err := getQueriesFromBytes(queryFile)
if err != nil {
return nil, err
}

casQueries := getCassandraQueriesFromQueryStrings(session, queryStrings)

return casQueries, nil
}

func GenerateSchemaIfNotPresent(session cassandra.Session, cfg *config.Schema) error {
casQueries, err := contructSchemaQueries(session, cfg)
if err != nil {
return err
}

for _, query := range casQueries {
err := query.Exec()
if err != nil {
return err
}
}

return nil
}
22 changes: 22 additions & 0 deletions plugin/storage/cassandra/schema/schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package schema

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTemplateRendering(t *testing.T) {
cfg := DefaultSchemaConfig()
res, err := getQueryFileAsBytes(`v004-go-tmpl-test.cql.tmpl`, &cfg)
require.NoError(t, err)

queryStrings, err := getQueriesFromBytes(res)
require.NoError(t, err)

assert.Equal(t, 9, len(queryStrings))
}
Loading

0 comments on commit 90368b1

Please sign in to comment.