Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial structure and block shipper #1

Merged
merged 14 commits into from
Nov 2, 2017
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
promlts
vendor/
13 changes: 13 additions & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
repository:
path: github.com/improbable-eng/promlts
build:
binaries:
- name: promlts
path: ./cmd/promlts
flags: -a -tags netgo
ldflags: |
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Version={{.Version}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Revision={{.Revision}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.Branch={{.Branch}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildUser={{user}}@{{host}}
-X {{repoPath}}/vendor/github.com/prometheus/common/version.BuildDate={{date "20060102-15:04:05"}}
67 changes: 67 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

[[dependencies]]
branch = "master"
name = "github.com/pkg/errors"

[[dependencies]]
branch = "master"
name = "github.com/prometheus/client_golang"

[[dependencies]]
branch = "master"
name = "github.com/prometheus/common"

[[dependencies]]
branch = "master"
name = "gopkg.in/alecthomas/kingpin.v2"
22 changes: 22 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
PREFIX ?= $(shell pwd)

all: format build

format:
@echo ">> formatting code"
@go fmt ./...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe goimports instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Literally cannot get it to run (: But I guess leaving gofmt here is fine since missing or extra imports will cause things to not compile anyway, which CI will catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was thinking about goimports just in terms of sorting imports (:


vet:
@echo ">> vetting code"
@go vet ./...

build: promu
@echo ">> building binaries"
@promu build --prefix $(PREFIX)

promu:
@echo ">> fetching promu"
@go get -u github.com/prometheus/promu


.PHONY: all format vet build promu
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# PromLTS

A block storage based long term storage for Prometheus.
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v0.0.1
84 changes: 84 additions & 0 deletions cmd/promlts/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

type runFunc func(log.Logger, prometheus.Registerer) error

func main() {
app := kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus")

app.Version(version.Print("promlts"))
app.HelpFlag.Short('h')

logLevel := app.Flag("log.level", "log filtering level").
Default("info").Enum("error", "warn", "info", "debug")

cmds := map[string]runFunc{
"sidecar": registerSidecar(app, "sidecar"),
"store": registerStore(app, "store"),
}
cmd, err := app.Parse(os.Args[1:])
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments"))
app.Usage(os.Args[1:])
os.Exit(2)
}

var logger log.Logger
{
var lvl level.Option
switch *logLevel {
case "error":
lvl = level.AllowError()
case "warn":
lvl = level.AllowWarn()
case "info":
lvl = level.AllowInfo()
case "debug":
lvl = level.AllowDebug()
default:
panic("unexpected log level")
}
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.NewSyncLogger(logger)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
logger = level.NewFilter(logger, lvl)
}

metrics := prometheus.NewRegistry()

metrics.MustRegister(
version.NewCollector("prometheus"),
prometheus.NewGoCollector(),
)

if err := cmds[cmd](logger, metrics); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrap(err, "command failed"))
os.Exit(1)
}
}

func interrupt(cancel <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case <-c:
return nil
case <-cancel:
return errors.New("canceled")
}
}
74 changes: 74 additions & 0 deletions cmd/promlts/sidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"

"github.com/improbable-eng/promlts/pkg/shipper"

"cloud.google.com/go/storage"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

func registerSidecar(app *kingpin.Application, name string) runFunc {
cmd := app.Command(name, "sidecar for Prometheus server")

promAddr := cmd.Flag("prometheus.address", "listen address of Prometheus instance").
Default("localhost:9090").String()

dataDir := cmd.Flag("tsdb.path", "data directory of TSDB").
Default("./data").String()

gcsBucket := cmd.Flag("gcs.bucket", "Google Cloud Storage bucket name for stored blocks").
PlaceHolder("<bucket>").Required().String()

return func(logger log.Logger, reg prometheus.Registerer) error {
return runSidecar(logger, reg, *promAddr, *dataDir, *gcsBucket)
}
}

func runSidecar(
logger log.Logger,
reg prometheus.Registerer,
promAddr string,
dataDir string,
gcsBucket string,
) error {
level.Info(logger).Log("msg", "I'm a sidecar", "promDir", dataDir, "promAddr", promAddr)

gcsClient, err := storage.NewClient(context.Background())
if err != nil {
return errors.Wrap(err, "create GCS client")
}
defer gcsClient.Close()

var g group.Group

// The background shipper continously scans the data directory and uploads
// new found blocks to Google Cloud Storage.
{
remote := shipper.NewGCSRemote(logger, nil, gcsClient.Bucket(gcsBucket))
s := shipper.New(logger, nil, dataDir, remote, shipper.IsULIDDir)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return s.Run(ctx)
}, func(error) {
cancel()
})
}
// Listen for termination signals.
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(cancel)
}, func(error) {
close(cancel)
})
}
return g.Run()
}
69 changes: 69 additions & 0 deletions cmd/promlts/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"

"github.com/alecthomas/units"

"cloud.google.com/go/storage"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
func registerStore(app *kingpin.Application, name string) runFunc {
cmd := app.Command(name, "sidecar for Prometheus server")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect command description (copy-pasta from sidecar)


gcsBucket := cmd.Flag("gcs.bucket", "Google Cloud Storage bucket name for stored blocks").
PlaceHolder("<bucket>").Required().String()

peers := cmd.Flag("peers", "Peering store nodes to connect to").
PlaceHolder("<peers>").Strings()

maxDiskCacheSize := cmd.Flag("disk-cache-size", "maximum size of on-disk cache").
Default("100GB").Bytes()

maxMemCacheSize := cmd.Flag("mem-cache-size", "maximum size of in-memory cache").
Default("4GB").Bytes()

return func(logger log.Logger, metrics prometheus.Registerer) error {
return runStore(logger, metrics, *gcsBucket, *peers, *maxDiskCacheSize, *maxMemCacheSize)
}
}

// runStore starts a daemon that connects to a cluster of other store nodes through gossip.
// It also connects to a Google Cloud Storage bucket and serves data queries to a subset of its contents.
// The served subset is determined through HRW hashing against the block's ULIDs and the known peers.
func runStore(
logger log.Logger,
reg prometheus.Registerer,
gcsBucket string,
peers []string,
diskCacheSize units.Base2Bytes,
memCacheSize units.Base2Bytes,
) error {
level.Info(logger).Log("msg", "I'm a store node", "diskCacheSize", diskCacheSize, "memCacheSize", memCacheSize)

gcsClient, err := storage.NewClient(context.Background())
if err != nil {
return errors.Wrap(err, "create GCS client")
}
defer gcsClient.Close()

var g group.Group

// Listen for termination signals.
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(cancel)
}, func(error) {
close(cancel)
})
}
return g.Run()
}
Empty file added pkg/queryproxy/proxy.go
Empty file.
Loading