Skip to content

Commit

Permalink
feat(otelbench): add restore
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Nov 29, 2024
1 parent 4570c40 commit e81bbab
Showing 1 changed file with 110 additions and 15 deletions.
125 changes: 110 additions & 15 deletions cmd/otelbench/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,33 @@ func newDumpCommand() *cobra.Command {
}
rootCmd.AddCommand(
newDumpCreateCommand(),
newDumpRestoreCommand(),
)
return rootCmd
}

func dumpTables() []string {
return []string{
"migration",
"metrics_exemplars",
"metrics_exp_histograms",
"metrics_labels",
"metrics_points",
"traces_spans",
"traces_tags",
"logs_attrs",
"logs",
}
}

func newDumpCreateCommand() *cobra.Command {
var arg struct {
LimitTime time.Duration
LimitCount int

Output string
Database string
Output string
Database string
Compression string

KubernetesNamespace string
KubernetesService string
Expand Down Expand Up @@ -113,17 +129,7 @@ func newDumpCreateCommand() *cobra.Command {
}
}
fmt.Println("Clickhouse connection is ready")
tables := []string{
"migration",
"metrics_exemplars",
"metrics_exp_histograms",
"metrics_labels",
"metrics_points",
"traces_spans",
"traces_tags",
"logs_attrs",
"logs",
}
tables := dumpTables()
var files []os.FileInfo
for _, table := range tables {
query := fmt.Sprintf("SELECT * FROM %s.%s", arg.Database, table)
Expand All @@ -143,8 +149,8 @@ func newDumpCreateCommand() *cobra.Command {
}

// SELECT * FROM faster.logs INTO OUTFILE '/tmp/dump.bin' FORMAT Native;
outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin.lz4", table))
query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE COMPRESSION 'lz4' FORMAT Native", outFile)
outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin.%s", table, arg.Compression))
query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE COMPRESSION '%s' FORMAT Native", outFile, arg.Compression)

args := []string{
"-h", "localhost",
Expand Down Expand Up @@ -195,6 +201,95 @@ func newDumpCreateCommand() *cobra.Command {
f.IntVar(&arg.LocalPort, "port", 9000, "Local port")
f.DurationVar(&arg.LimitTime, "duration", 0, "Limit oldest data with delta from now")
f.IntVar(&arg.LimitCount, "limit", 0, "Limit oldest data with count")
f.StringVar(&arg.Compression, "compression", "zstd", "Compression algorithm")

return rootCmd
}

func newDumpRestoreCommand() *cobra.Command {
var arg struct {
Input string
Database string
Host string
Port int
Truncate bool
Compression string
}
rootCmd := &cobra.Command{
Use: "restore",
Short: "Restore a dump",
RunE: func(cobraCommand *cobra.Command, _ []string) error {
ctx := cobraCommand.Context()
client, err := ch.Dial(ctx, ch.Options{
Address: fmt.Sprintf("%s:%d", arg.Host, arg.Port),
Database: arg.Database,
})
if err != nil {
return errors.Wrap(err, "dial")
}

truncate := func(ctx context.Context, table string) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

query := fmt.Sprintf("TRUNCATE TABLE %s.%s", arg.Database, table)
if err := client.Do(ctx, ch.Query{Body: query}); err != nil {
return errors.Wrap(err, "truncate")
}

return nil
}

tables := dumpTables()
for _, table := range tables {
file := filepath.Join(arg.Input, fmt.Sprintf("%s.bin.%s", table, arg.Compression))
if _, err := os.Stat(file); err != nil {
if os.IsNotExist(err) {
fmt.Printf("Table %s not found\n", table)
continue
}
return errors.Wrap(err, "stat")
}
if arg.Truncate {
if err := truncate(ctx, table); err != nil {
return errors.Wrapf(err, "truncate %s", table)
}
}

if err := func() error {
q := fmt.Sprintf("INSERT INTO %s.%s FROM INFILE '%s' COMPRESSION '%s' FORMAT Native",
arg.Database, table, file, arg.Compression,
)
args := []string{
"-h", arg.Host,
"-d", arg.Database,
"--progress",
"-q", q,
}
cmd := exec.CommandContext(ctx, "clickhouse-client", args...)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout

fmt.Println(">", table)
if err := cmd.Run(); err != nil {
return errors.Wrapf(err, "restore %s", table)
}

return nil
}(); err != nil {
return errors.Wrapf(err, "restore %s", file)
}
}
return nil
},
}

f := rootCmd.Flags()
f.StringVarP(&arg.Input, "input", "i", "", "Input directory")
f.StringVarP(&arg.Database, "database", "d", "default", "Database name")
f.IntVar(&arg.Port, "port", 9000, "Clickhouse port")
f.StringVar(&arg.Host, "host", "localhost", "Clickhouse host")
f.BoolVar(&arg.Truncate, "truncate", false, "Truncate tables before restore")

return rootCmd
}

0 comments on commit e81bbab

Please sign in to comment.