Skip to content

Commit

Permalink
feat(chstorage): support TTL and CLUSTER
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 2, 2024
1 parent 2bcdb8b commit debed3a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 53 deletions.
92 changes: 60 additions & 32 deletions internal/chstorage/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"crypto/sha256"
"fmt"
"strings"
"time"

"github.com/ClickHouse/ch-go"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/ddl"
)

// Tables define table names.
Expand All @@ -25,6 +29,9 @@ type Tables struct {
LogAttrs string

Migration string

TTL time.Duration
Cluster string
}

// Validate checks table names
Expand Down Expand Up @@ -110,65 +117,86 @@ func (t Tables) saveHashes(ctx context.Context, c chClient, m map[string]string)
return nil
}

type generateOptions struct {
Name string
DDL string
TTLField string
}

func (t Tables) generateQuery(opts generateOptions) string {
var s strings.Builder
s.WriteString("CREATE TABLE IF NOT EXISTS ")
s.WriteString(ddl.Backtick(opts.Name))
if t.Cluster != "" {
s.WriteString(" ON CLUSTER ")
s.WriteString(ddl.Backtick(t.Cluster))
}
s.WriteString("\n")
s.WriteString(opts.DDL)
if t.TTL > 0 && opts.TTLField != "" {
s.WriteString("\n")
s.WriteString("TTL ")
s.WriteString(opts.TTLField)
s.WriteString(" + INTERVAL ")
s.WriteString(fmt.Sprintf("%d", t.TTL/time.Second))
s.WriteString(" SECOND")
}

return s.String()
}

// Create creates tables.
func (t Tables) Create(ctx context.Context, c chClient) error {
if err := t.Validate(); err != nil {
return errors.Wrap(err, "validate")
}

type schema struct {
name string
query string
}
for _, s := range []schema{
{t.Migration, schemaMigration},
} {
if err := c.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: fmt.Sprintf(s.query, s.name),
}); err != nil {
return errors.Wrapf(err, "create %q", s.name)
}
if err := c.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: t.generateQuery(generateOptions{Name: t.Migration, DDL: schemaMigration}),
}); err != nil {
return errors.Wrapf(err, "create %q", t.Migration)
}

hashes, err := t.getHashes(ctx, c)
if err != nil {
return errors.Wrap(err, "get hashes")
}

for _, s := range []schema{
{t.Spans, spansSchema},
{t.Tags, tagsSchema},

{t.Points, pointsSchema},
{t.ExpHistograms, expHistogramsSchema},
{t.Exemplars, exemplarsSchema},
{t.Labels, labelsSchema},

{t.Logs, logsSchema},
{t.LogAttrs, logAttrsSchema},
for _, s := range []generateOptions{
{Name: t.Spans, DDL: spansSchema, TTLField: "start"},
{Name: t.Tags, DDL: tagsSchema},
{Name: t.Points, DDL: pointsSchema, TTLField: "timestamp"},
{Name: t.ExpHistograms, DDL: expHistogramsSchema, TTLField: "timestamp"},
{Name: t.Exemplars, DDL: exemplarsSchema, TTLField: "timestamp"},
{Name: t.Labels, DDL: labelsSchema},
{Name: t.Logs, DDL: logsSchema, TTLField: "timestamp"},
{Name: t.LogAttrs, DDL: logAttrsSchema},
} {
target := fmt.Sprintf("%x", sha256.Sum256([]byte(s.query)))
if current, ok := hashes[s.name]; ok && current != target {
query := t.generateQuery(s)
name := s.Name
target := fmt.Sprintf("%x", sha256.Sum256([]byte(query)))
if current, ok := hashes[s.Name]; ok && current != target {
// HACK: this will DROP all data in the table
// TODO: implement ALTER TABLE
zctx.From(ctx).Warn("DROPPING TABLE (schema changed!)",
zap.String("table", s.name),
zap.String("table", name),
zap.String("current", current),
zap.String("target", target),
)
if err := c.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: fmt.Sprintf("DROP TABLE %s", s.name),
Body: fmt.Sprintf("DROP TABLE %s", name),
}); err != nil {
return errors.Wrapf(err, "drop %q", s.name)
return errors.Wrapf(err, "drop %q", name)
}
}
hashes[s.name] = target
hashes[name] = target
if err := c.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: fmt.Sprintf(s.query, s.name),
Body: query,
}); err != nil {
return errors.Wrapf(err, "create %q", s.name)
return errors.Wrapf(err, "create %q", name)
}
}
if err := t.saveHashes(ctx, c, hashes); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions internal/chstorage/schema_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chstorage
const (
logsSchema = `
-- https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition
CREATE TABLE IF NOT EXISTS %s
(
-- materialized fields from semantic conventions
-- NB: They MUST NOT be present in the 'resource' field.
Expand Down Expand Up @@ -43,15 +42,14 @@ CREATE TABLE IF NOT EXISTS %s
PARTITION BY toYYYYMMDD(timestamp)
PRIMARY KEY (severity_number, service_namespace, service_name, cityHash64(resource))
ORDER BY (severity_number, service_namespace, service_name, cityHash64(resource), timestamp)
SETTINGS index_granularity=8192;
SETTINGS index_granularity=8192
`

logAttrsSchema = `
CREATE TABLE IF NOT EXISTS %s
(
name String, -- foo_bar
key String, -- foo.bar
)
ENGINE = ReplacingMergeTree
ORDER BY name;`
ORDER BY name`
)
16 changes: 8 additions & 8 deletions internal/chstorage/schema_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
)

const (
pointsSchema = `CREATE TABLE IF NOT EXISTS %s
pointsSchema = `
(
name LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
Expand All @@ -39,7 +39,7 @@ const (
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
PRIMARY KEY (name, mapping, cityHash64(resource), cityHash64(attributes))
ORDER BY (name, mapping, cityHash64(resource), cityHash64(attributes), timestamp);`
ORDER BY (name, mapping, cityHash64(resource), cityHash64(attributes), timestamp)`
metricMappingDDL = `
'NO_MAPPING' = 0,
'HISTOGRAM_COUNT' = 1,
Expand All @@ -51,7 +51,7 @@ const (
'SUMMARY_SUM' = 7,
'SUMMARY_QUANTILE' = 8
`
expHistogramsSchema = `CREATE TABLE IF NOT EXISTS %s
expHistogramsSchema = `
(
name LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
Expand All @@ -72,8 +72,8 @@ const (
resource String
)
ENGINE = MergeTree()
ORDER BY timestamp;`
exemplarsSchema = `CREATE TABLE IF NOT EXISTS %s
ORDER BY timestamp`
exemplarsSchema = `
(
name LowCardinality(String),
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
Expand All @@ -88,9 +88,9 @@ const (
resource String
)
ENGINE = MergeTree()
ORDER BY (name, cityHash64(resource), cityHash64(attributes), timestamp);`
ORDER BY (name, cityHash64(resource), cityHash64(attributes), timestamp)`

labelsSchema = `CREATE TABLE IF NOT EXISTS %s
labelsSchema = `
(
name LowCardinality(String),
key LowCardinality(String),
Expand All @@ -99,7 +99,7 @@ const (
value_normalized String -- normalized value, 'foo_bar''
)
ENGINE = ReplacingMergeTree
ORDER BY (name, value);`
ORDER BY (name, value)`
)

func parseLabels(s string, to map[string]string) error {
Expand Down
6 changes: 3 additions & 3 deletions internal/chstorage/schema_migration.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package chstorage

const schemaMigration = `
CREATE TABLE IF NOT EXISTS %s (
(
table String,
ddl String,
ts DateTime DEFAULT now(),
) ENGINE = ReplacingMergeTree(ts)
ORDER BY (table, ddl);`
) ENGINE = ReplacingMergeTree(ts)
ORDER BY (table, ddl)`
6 changes: 3 additions & 3 deletions internal/chstorage/schema_traces.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package chstorage

const (
spansSchema = `CREATE TABLE IF NOT EXISTS %s
spansSchema = `
(
-- materialized fields from semantic conventions
-- NB: They MUST NOT be present in the 'resource' field.
Expand Down Expand Up @@ -50,10 +50,10 @@ const (
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(start)
PRIMARY KEY (service_namespace, service_name, cityHash64(resource))
ORDER BY (service_namespace, service_name, cityHash64(resource), start);
ORDER BY (service_namespace, service_name, cityHash64(resource), start)
`
kindDDL = `'KIND_UNSPECIFIED' = 0,'KIND_INTERNAL' = 1,'KIND_SERVER' = 2,'KIND_CLIENT' = 3,'KIND_PRODUCER' = 4,'KIND_CONSUMER' = 5`
tagsSchema = `CREATE TABLE IF NOT EXISTS %s
tagsSchema = `
(
name LowCardinality(String),
value String,
Expand Down
7 changes: 4 additions & 3 deletions internal/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ type Table struct {
Engine string
}

func backtick(s string) string {
// Backtick adds backticks to the string.
func Backtick(s string) string {
return "`" + s + "`"
}

func backticks(ss []string) []string {
out := make([]string, len(ss))
for i, s := range ss {
out[i] = backtick(s)
out[i] = Backtick(s)
}
return out
}
Expand All @@ -43,7 +44,7 @@ func Generate(table Table) (string, error) {
for i, c := range table.Columns {
var col strings.Builder
col.WriteString("\t")
col.WriteString(backtick(c.Name))
col.WriteString(Backtick(c.Name))
col.WriteString(" ")
col.WriteString(c.Type.String())
if c.Codec != "" {
Expand Down

0 comments on commit debed3a

Please sign in to comment.