Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cassandra db schema on session initialization #5922

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5041f31
Embeded template file in binary and added query construction and exec…
akstron Oct 28, 2024
30db170
Removed unnecessary SchemaConfig struct
akstron Oct 28, 2024
ce0c375
Added new schema configs in default config generator
akstron Oct 28, 2024
810ab1c
Revert Keyspace removal
akstron Oct 28, 2024
0d6383f
Bug fix while creating queries
akstron Oct 29, 2024
207945f
Improving test
akstron Oct 29, 2024
985f65b
Created new struct for derived params
akstron Oct 29, 2024
1c30503
Remove fields from yaml file
akstron Oct 29, 2024
e4ab709
Added integration test
akstron Nov 19, 2024
c329bba
Rebase fixes
akstron Nov 19, 2024
e3c6045
Minor changes in integration script
akstron Nov 19, 2024
492e15e
removed test
akstron Nov 19, 2024
44c39dc
Updated fields with time.Duration type and added validators and tests
akstron Nov 20, 2024
dfc0c43
minor changes in script
akstron Nov 20, 2024
cb8ae19
Addressed comments
akstron Nov 20, 2024
c3d0fbd
Addressed comments
akstron Nov 21, 2024
728a139
Update pkg/cassandra/config/schema.go
akstron Nov 21, 2024
1b6683d
Update pkg/cassandra/config/config.go
akstron Nov 21, 2024
ce11cc1
Addressed comments
akstron Nov 21, 2024
de1c563
Removed unused CasVersion
akstron Nov 21, 2024
edabe22
Addressed validation comments
akstron Nov 22, 2024
d0e1976
Created helper function for session created and updated tests
akstron Nov 26, 2024
d8479b5
Added schema unit tests
akstron Nov 26, 2024
02b6159
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
73d276a
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
57349a8
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
84b52e1
Fixed build
akstron Nov 26, 2024
9c2f05b
formatting fixes
akstron Nov 26, 2024
2c8de88
test fix
akstron Nov 27, 2024
eeb1951
Added test in workflow
akstron Nov 28, 2024
601365d
fmt fixes
akstron Nov 28, 2024
4aeaad7
create schema bug fix
akstron Nov 28, 2024
0167cb6
exclude v1 run with skip-apply-schema as true
akstron Nov 28, 2024
07b99da
Added schemaCreator and comments in workflow
akstron Nov 28, 2024
04ec76f
ci changes
akstron Nov 28, 2024
7246494
made template params private
akstron Nov 28, 2024
d8613f1
workflow fix
akstron Nov 28, 2024
a7853ec
Changed env variable name
akstron Nov 29, 2024
10bd9aa
lint fixes
akstron Nov 29, 2024
a682db2
lint fix
akstron Nov 29, 2024
35c26b5
test fix
akstron Nov 29, 2024
ddf4fc0
Workflow and test minor changes
akstron Nov 29, 2024
bbf3ac8
test fix
akstron Nov 30, 2024
a8e3aae
workflow changes
akstron Nov 30, 2024
a46bf37
Merge branch 'main' into create-database-scheme-cassandra
akstron Nov 30, 2024
c2b89e0
Apply suggestions from code review
yurishkuro Nov 30, 2024
de2d2ef
Update docs
yurishkuro Nov 30, 2024
100208c
refactor
yurishkuro Nov 30, 2024
37d34dd
clean-up imports
yurishkuro Nov 30, 2024
8d65894
clean-up
yurishkuro Nov 30, 2024
23a9b62
simplify
yurishkuro Nov 30, 2024
d925074
fix
yurishkuro Nov 30, 2024
154deb9
Fix workflow
yurishkuro Nov 30, 2024
a238455
Merge branch 'main' into create-database-scheme-cassandra
yurishkuro Nov 30, 2024
cc9db9c
rename
yurishkuro Nov 30, 2024
9d8ba06
Fix script
yurishkuro Nov 30, 2024
b0ac38a
fix naming for code coverage
yurishkuro Nov 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/ci-e2e-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ jobs:
fail-fast: false
matrix:
jaeger-version: [v1, v2]
create-schema: [manual, auto]
version:
- distribution: cassandra
major: 4.x
schema: v004
- distribution: cassandra
major: 5.x
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
exclude:
# Exclude v1 as create schema on fly is available for v2 only
- jaeger-version: v1
create-schema: auto
name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
Expand All @@ -45,9 +50,11 @@ jobs:
- name: Run cassandra integration tests
id: test-execution
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
env:
SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }}
4 changes: 3 additions & 1 deletion cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand All @@ -44,7 +45,8 @@ extensions:
another_storage:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
keyspace: "jaeger_v1_dc1_archive"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand Down
86 changes: 63 additions & 23 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ package config

import (
"context"
"errors"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
Expand Down Expand Up @@ -58,6 +56,19 @@ type Schema struct {
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow is the size of the window for TimeWindowCompactionStrategy.
// All SSTables within that window are grouped together into one SSTable.
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
}

type Query struct {
Expand Down Expand Up @@ -86,7 +97,13 @@ type BasicAuthenticator struct {
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_dc1",
Datacenter: "dc1",
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand All @@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
Expand All @@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
session, err := cluster.CreateSession()
if err != nil {
return nil, err
}
return gocqlw.WrapCQLSession(session), nil
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
Expand Down Expand Up @@ -210,7 +230,27 @@ func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func isValidTTL(duration time.Duration) bool {
return duration == 0 || duration >= time.Second
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
if err != nil {
return err
}

if !isValidTTL(c.Schema.TraceTTL) {
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
}

if !isValidTTL(c.Schema.DependenciesTTL) {
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
}

if c.Schema.CompactionWindow < time.Minute {
return errors.New("compaction_window should at least be 1 minute")
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
Connection: Connection{
Servers: []string{"localhost:9200"},
},
Schema: Schema{
CompactionWindow: time.Minute,
},
}

err := cfg.Validate()
Expand Down Expand Up @@ -94,3 +98,23 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.CompactionWindow = time.Minute
cfg.Schema.DependenciesTTL = time.Second - 1
err = cfg.Validate()
require.Error(t, err)
}
65 changes: 56 additions & 9 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -55,17 +57,22 @@
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primaryConfig config.Configuration
archiveConfig *config.Configuration

primarySession cassandra.Session
archiveConfig config.SessionBuilder
archiveSession cassandra.Session

// tests can override this
sessionBuilderFn func(*config.Configuration) (cassandra.Session, error)
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
sessionBuilderFn: NewSession,
}
}

Expand Down Expand Up @@ -126,9 +133,7 @@
o.others = make(map[string]*NamespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
f.archiveConfig = f.Options.Get(archiveStorageConfig)
}

// Initialize implements storage.Factory
Expand All @@ -137,14 +142,14 @@
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
f.logger = logger

primarySession, err := f.primaryConfig.NewSession()
primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
if err != nil {
return err
}
f.primarySession = primarySession

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
archiveSession, err := f.sessionBuilderFn(f.archiveConfig)
if err != nil {
return err
}
Expand All @@ -155,6 +160,48 @@
return nil
}

// createSession creates session from a configuration
func createSession(c *config.Configuration) (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return gocqlw.WrapCQLSession(session), nil
}

// newSessionPrerequisites creates tables and types before creating a session
func newSessionPrerequisites(c *config.Configuration) error {
if !c.Schema.CreateSchema {
return nil
}

cfg := *c // clone because we need to connect without specifying a keyspace
cfg.Schema.Keyspace = ""

session, err := createSession(&cfg)
if err != nil {
return err
}

sc := schema.NewSchemaCreator(session, c.Schema)
return sc.CreateSchemaIfNotPresent()

Check warning on line 193 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L192-L193

Added lines #L192 - L193 were not covered by tests
}

// NewSession creates a new Cassandra session
func NewSession(c *config.Configuration) (cassandra.Session, error) {
if err := newSessionPrerequisites(c); err != nil {
return nil, err
}

return createSession(c)
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
Expand Down
Loading
Loading