Skip to content

Commit

Permalink
feat(chstorage): generate DDL
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 3, 2024
1 parent 2053cb4 commit b7ffb8a
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 18 deletions.
27 changes: 27 additions & 0 deletions internal/chstorage/_golden/log_columns_ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE IF NOT EXISTS table
(
`service_instance_id` LowCardinality(String) COMMENT 'service.instance.id',
`service_name` LowCardinality(String) COMMENT 'service.name',
`service_namespace` LowCardinality(String) COMMENT 'service.namespace',
`timestamp` DateTime64(9),
`severity_number` UInt8,
`severity_text` LowCardinality(String),
`trace_id` FixedString(16),
`span_id` FixedString(8),
`trace_flags` UInt8,
`body` String,
`attribute` String,
`resource` LowCardinality(String),
`scope_name` LowCardinality(String),
`scope_version` LowCardinality(String),
`scope` LowCardinality(String),

INDEX `idx_trace_id` trace_id TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX `idx_body` body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1,
INDEX `idx_ts` timestamp TYPE minmax GRANULARITY 8192,
INDEX `attribute_keys` arrayConcat(JSONExtractKeys(attribute), JSONExtractKeys(scope), JSONExtractKeys(resource)) TYPE set(100),
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (`severity_number`, `service_namespace`, `service_name`, `resource`, `timestamp`)
PRIMARY KEY (`severity_number`, `service_namespace`, `service_name`, `resource`)
11 changes: 11 additions & 0 deletions internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/ddl"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand Down Expand Up @@ -273,6 +274,16 @@ func (a *Attributes) Row(idx int) otelstorage.Attrs {
return a.Value.Row(idx)
}

// DDL applies the schema changes to the table.
func (a *Attributes) DDL(table *ddl.Table) {
table.Columns = append(table.Columns,
ddl.Column{
Name: a.Name,
Type: a.Value.Type(),
},
)
}

func attrsToLabels(m otelstorage.Attrs, to map[string]string) {
m.AsMap().Range(func(k string, v pcommon.Value) bool {
to[otelstorage.KeyToLabel(k)] = v.Str()
Expand Down
101 changes: 101 additions & 0 deletions internal/chstorage/columns_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/ddl"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/xsync"
Expand All @@ -19,6 +20,106 @@ var (
logAttrMapColumnsPool = xsync.NewPool(newLogAttrMapColumns)
)

// DDL of the log table.
func (c *logColumns) DDL() ddl.Table {
table := ddl.Table{
Engine: "MergeTree",
PartitionBy: "toYYYYMMDD(timestamp)",
PrimaryKey: []string{"severity_number", "service_namespace", "service_name", "resource"},
OrderBy: []string{"severity_number", "service_namespace", "service_name", "resource", "timestamp"},
Indexes: []ddl.Index{
{
Name: "idx_trace_id",
Target: "trace_id",
Type: "bloom_filter",
Params: []string{"0.001"},
Granularity: 1,
},
{
Name: "idx_body",
Target: "body",
Type: "tokenbf_v1",
Params: []string{"32768", "3", "0"},
Granularity: 1,
},
{
Name: "idx_ts",
Target: "timestamp",
Type: "minmax",
Granularity: 8192,
},
{
Name: "attribute_keys",
Target: "arrayConcat(JSONExtractKeys(attribute), JSONExtractKeys(scope), JSONExtractKeys(resource))",
Type: "set",
Params: []string{"100"},
},
},
Columns: []ddl.Column{
{
Name: "service_instance_id",
Type: c.serviceInstanceID.Type(),
Comment: "service.instance.id",
},
{
Name: "service_name",
Type: c.serviceName.Type(),
Comment: "service.name",
},
{
Name: "service_namespace",
Type: c.serviceNamespace.Type(),
Comment: "service.namespace",
},
{
Name: "timestamp",
Type: c.timestamp.Type(),
},
{
Name: "severity_number",
Type: c.severityNumber.Type(),
},
{
Name: "severity_text",
Type: c.severityText.Type(),
},
{
Name: "trace_id",
Type: c.traceID.Type(),
},
{
Name: "span_id",
Type: c.spanID.Type(),
},
{
Name: "trace_flags",
Type: c.traceFlags.Type(),
},
{
Name: "body",
Type: c.body.Type(),
},
},
}

c.attributes.DDL(&table)
c.resource.DDL(&table)

table.Columns = append(table.Columns,
ddl.Column{
Name: "scope_name",
Type: c.scopeName.Type(),
},
ddl.Column{
Name: "scope_version",
Type: c.scopeVersion.Type(),
},
)
c.scopeAttributes.DDL(&table)

return table
}

type logColumns struct {
serviceInstanceID *proto.ColLowCardinality[string]
serviceName *proto.ColLowCardinality[string]
Expand Down
20 changes: 20 additions & 0 deletions internal/chstorage/columns_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package chstorage

import (
"testing"

"github.com/go-faster/sdk/gold"
"github.com/stretchr/testify/require"

"github.com/go-faster/oteldb/internal/ddl"
)

func Test_logColumns_DDL(t *testing.T) {
lc := newLogColumns()
table := lc.DDL()

s, err := ddl.Generate(table)
require.NoError(t, err)

gold.Str(t, s, "log_columns_ddl.sql")
}
13 changes: 13 additions & 0 deletions internal/ddl/_golden/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS table
(
`a` Int32,
`bar` LowCardinality(String) CODEC(ZSTD(1)),
`c` String COMMENT 'foo.bar' CODEC(ZSTD(1)),

INDEX `idx_trace_id` trace_id TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX `idx_body` body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1,
INDEX `idx_ts` timestamp TYPE minmax GRANULARITY 8192,
INDEX `attribute_keys` arrayConcat(JSONExtractKeys(attribute), JSONExtractKeys(scope), JSONExtractKeys(resource)) TYPE set(100),
)
ENGINE = MergeTree()
ORDER BY (`a`, `b`)
6 changes: 0 additions & 6 deletions internal/ddl/_golden/ddl.txt

This file was deleted.

95 changes: 85 additions & 10 deletions internal/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,60 @@
package ddl

import (
"fmt"
"strconv"
"strings"

"github.com/ClickHouse/ch-go/proto"
)

// Column of [Table].
type Column struct {
Name string
Type proto.ColumnType
Codec proto.ColumnType
Name string
Comment string
Type proto.ColumnType
Codec proto.ColumnType
}

// Table description.
type Table struct {
Name string
Columns []Column
Indexes []Index
OrderBy []string
PrimaryKey []string
PartitionBy string
Engine string
}

type Index struct {
Name string
Target string
Type string
Params []string
Granularity int
}

func (i Index) string(nameFormat string) string {
var b strings.Builder
b.WriteString("INDEX ")
b.WriteString(fmt.Sprintf(nameFormat, Backtick(i.Name)))
b.WriteString(" ")
b.WriteString(i.Target)
b.WriteString(" TYPE ")
b.WriteString(i.Type)
if len(i.Params) > 0 {
b.WriteString("(")
b.WriteString(strings.Join(i.Params, ", "))
b.WriteString(")")
}
if i.Granularity > 0 {
b.WriteString(" GRANULARITY ")
b.WriteString(strconv.Itoa(i.Granularity))
}
return b.String()
}

// Backtick adds backticks to the string.
func Backtick(s string) string {
return "`" + s + "`"
Expand All @@ -40,23 +72,66 @@ func backticks(ss []string) []string {
// Generate DDL without CREATE TABLE statement.
func Generate(table Table) (string, error) {
var b strings.Builder
b.WriteString("(\n")
for i, c := range table.Columns {
b.WriteString(`CREATE TABLE IF NOT EXISTS `)
if table.Name == "" {
b.WriteString("table")
} else {
b.WriteString(Backtick(table.Name))
}
b.WriteString("\n(")
var (
maxColumnLen int
maxColumnTypeLen int
)
for _, c := range table.Columns {
if len(c.Name) > maxColumnLen {
maxColumnLen = len(c.Name)
}
if len(c.Type.String()) > maxColumnTypeLen {
maxColumnTypeLen = len(c.Type.String())
}
}
for _, c := range table.Columns {
b.WriteString("\n")
var col strings.Builder
col.WriteString("\t")
col.WriteString(Backtick(c.Name))
nameFormat := "%-" + strconv.Itoa(maxColumnLen+2) + "s"
col.WriteString(fmt.Sprintf(nameFormat, Backtick(c.Name)))
col.WriteString(" ")
col.WriteString(c.Type.String())
typeFormat := "%-" + strconv.Itoa(maxColumnTypeLen) + "s"
if c.Codec == "" && c.Comment == "" {
typeFormat = "%s"
}
col.WriteString(fmt.Sprintf(typeFormat, c.Type.String()))
if c.Comment != "" {
col.WriteString(" COMMENT ")
col.WriteString("'")
col.WriteString(c.Comment)
col.WriteString("'")
}
if c.Codec != "" {
col.WriteString(" CODEC(")
col.WriteString(c.Codec.String())
col.WriteRune(')')
}
if i < len(table.Columns)-1 {
col.WriteString(",\n")
}
col.WriteString(",")
b.WriteString(col.String())
}
var maxIndexLen int
for _, c := range table.Indexes {
if len(c.Name) > maxIndexLen {
maxIndexLen = len(c.Name)
}
}
for i, c := range table.Indexes {
if i == 0 {
b.WriteString("\n")
}
b.WriteString("\n\t")
nameFormat := "%-" + strconv.Itoa(maxIndexLen+2) + "s"
b.WriteString(c.string(nameFormat))
b.WriteString(",")
}
b.WriteString("\n)\n")
b.WriteString("ENGINE = ")
if table.Engine == "" {
Expand Down
38 changes: 36 additions & 2 deletions internal/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,46 @@ func TestGenerate(t *testing.T) {
Type: "Int32",
},
{
Name: "b",
Name: "bar",
Type: proto.ColumnType("LowCardinality").Sub(proto.ColumnTypeString),
Codec: "ZSTD(1)",
},
{
Name: "c",
Type: "String",
Codec: "ZSTD(1)",
Comment: "foo.bar",
},
},
Indexes: []Index{
{
Name: "idx_trace_id",
Target: "trace_id",
Type: "bloom_filter",
Params: []string{"0.001"},
Granularity: 1,
},
{
Name: "idx_body",
Target: "body",
Type: "tokenbf_v1",
Params: []string{"32768", "3", "0"},
Granularity: 1,
},
{
Name: "idx_ts",
Target: "timestamp",
Type: "minmax",
Granularity: 8192,
},
{
Name: "attribute_keys",
Target: "arrayConcat(JSONExtractKeys(attribute), JSONExtractKeys(scope), JSONExtractKeys(resource))",
Type: "set",
Params: []string{"100"},
},
},
})
require.NoError(t, err)
gold.Str(t, s, "ddl.txt")
gold.Str(t, s, "ddl.sql")
}

0 comments on commit b7ffb8a

Please sign in to comment.